Skip to content

Commit

Permalink
Merge branch 'master' into bigquery-queries
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Aug 23, 2024
2 parents e0f8434 + e1a2908 commit 6355683
Show file tree
Hide file tree
Showing 41 changed files with 17,021 additions and 2,773 deletions.
8 changes: 0 additions & 8 deletions docs-website/docusaurus.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,6 @@ module.exports = {
value: '<div class="dropdown__link"><b>Archived versions</b></div>',
},
{
value: `
<a class="dropdown__link" href="https://docs-website-qou70o69f-acryldata.vercel.app/docs/features">0.14.0
<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg>
</a>
`,
type: "html",
},
{
value: `
<a class="dropdown__link" href="https://docs-website-lzxh86531-acryldata.vercel.app/docs/features">0.13.0
<svg width="12" height="12" aria-hidden="true" viewBox="0 0 24 24"><path fill="currentColor" d="M21 13v10h-21v-19h12v2h-10v15h17v-8h2zm3-12h-10.988l4.035 4-6.977 7.07 2.828 2.828 6.977-7.07 4.125 4.172v-11z"></path></svg>
Expand Down
21 changes: 16 additions & 5 deletions docs-website/download_historical_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import tarfile
import time
import urllib.request
import shutil

repo_url = "https://api.github.com/repos/datahub-project/static-assets"

Expand All @@ -18,7 +19,7 @@ def download_file(url, destination):


def fetch_urls(
repo_url: str, folder_path: str, file_format: str, max_retries=3, retry_delay=5
repo_url: str, folder_path: str, file_format: str, active_versions: list, max_retries=3, retry_delay=5
):
api_url = f"{repo_url}/contents/{folder_path}"
for attempt in range(max_retries + 1):
Expand All @@ -30,7 +31,7 @@ def fetch_urls(
urls = [
file["download_url"]
for file in json.loads(data)
if file["name"].endswith(file_format)
if file["name"].endswith(file_format) and any(version in file["name"] for version in active_versions)
]
print(urls)
return urls
Expand All @@ -48,12 +49,22 @@ def extract_tar_file(destination_path):
tar.extractall()
os.remove(destination_path)

def get_active_versions():
# read versions.json
with open("versions.json") as f:
versions = json.load(f)
return versions

def clear_directory(directory):
if os.path.exists(directory):
shutil.rmtree(directory)
os.makedirs(directory)

def download_versioned_docs(folder_path: str, destination_dir: str, file_format: str):
if not os.path.exists(destination_dir):
os.makedirs(destination_dir)
clear_directory(destination_dir) # Clear the directory before downloading

urls = fetch_urls(repo_url, folder_path, file_format)
active_versions = get_active_versions()
urls = fetch_urls(repo_url, folder_path, file_format, active_versions)

for url in urls:
filename = os.path.basename(url)
Expand Down
33 changes: 29 additions & 4 deletions metadata-ingestion/docs/sources/s3/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,31 @@

Path Specs (`path_specs`) is a list of Path Spec (`path_spec`) objects where each individual `path_spec` represents one or more datasets. Include path (`path_spec.include`) represents formatted path to the dataset. This path must end with `*.*` or `*.[ext]` to represent leaf level. If `*.[ext]` is provided then files with only specified extension type will be scanned. "`.[ext]`" can be any of [supported file types](#supported-file-types). Refer [example 1](#example-1---individual-file-as-dataset) below for more details.

All folder levels need to be specified in include path. You can use `/*/` to represent a folder level and avoid specifying exact folder name. To map folder as a dataset, use `{table}` placeholder to represent folder level for which dataset is to be created. For a partitioned dataset, you can use placeholder `{partition_key[i]}` to represent name of `i`th partition and `{partition[i]}` to represent value of `i`th partition. During ingestion, `i` will be used to match partition_key to partition. Refer [example 2 and 3](#example-2---folder-of-files-as-dataset-without-partitions) below for more details.
All folder levels need to be specified in include path. You can use `/*/` to represent a folder level and avoid specifying exact folder name. To map folder as a dataset, use `{table}` placeholder to represent folder level for which dataset is to be created. For a partitioned dataset, you can use placeholder `{partition_key[i]}` to represent name of `i`th partition and `{partition_value[i]}` to represent value of `i`th partition. During ingestion, `i` will be used to match partition_key to partition. Refer [example 2 and 3](#example-2---folder-of-files-as-dataset-without-partitions) below for more details.

Exclude paths (`path_spec.exclude`) can be used to ignore paths that are not relevant to current `path_spec`. This path cannot have named variables ( `{}` ). Exclude path can have `**` to represent multiple folder levels. Refer [example 4](#example-4---folder-of-files-as-dataset-with-partitions-and-exclude-filter) below for more details.

Refer [example 5](#example-5---advanced---either-individual-file-or-folder-of-files-as-dataset) if your bucket has more complex dataset representation.


**Additional points to note**
- Folder names should not contain {, }, *, / in their names.
- Named variable {folder} is reserved for internal working. please do not use in named variables.

#### Partitioned Dataset support
If your dataset is partitioned by the `partition_key`=`partition_value` format, then the partition values are auto-detected.

Otherwise, you can specify partitions in the following way in the path_spec:
1. Specify partition_key and partition_value in the path like => `{partition_key[0]}={partition_value[0]}/{partition_key[1]}={partition_value[1]}/{partition_key[2]}={partition_value[2]}`
2. Partition key can be specify using named variables in the path_spec like => `year={year}/month={month}/day={day}`
3 if the path is in the form of /value1/value2/value3 the source infer partition value from the path and assign partition_0, partition_1, partition_2 etc

Dataset creation time is determined by the creation time of earliest created file in the lowest partition while last updated time is determined by the last updated time of the latest updated file in the highest partition.

How the source determines the highest/lowest partition it is based on the traversal method set in the path_spec.
- If the traversal method is set to `MAX` then the source will try to find the latest partition by ordering the partitions each level and find the latest partiton. This traversal method won't look for earilest partition/creation time but this is the fastest.
- If the traversal method is set to `MIN_MAX` then the source will try to find the latest and earliest partition by ordering the partitions each level and find the latest/earliest partiton. This traversal sort folders purely by name therefor it is fast but it doesn't guarantee the latest partition will have the latest created file.
- If the traversal method is set to `ALL` then the source will try to find the latest and earliest partition by listing all the files in all the partitions and find the creation/last modification time based on the file creations. This is the slowest but for non time partitioned datasets this is the only way to find the latest/earliest partition.

### Path Specs - Examples
#### Example 1 - Individual file as Dataset
Expand Down Expand Up @@ -73,7 +88,12 @@ test-bucket
Path specs config to ingest folders `orders` and `returns` as datasets:
```
path_specs:
- include: s3://test-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
- include: s3://test-bucket/{table}/{partition_key[0]}={partition_value[0]}/{partition_key[1]}={partition_value[1]}/*.parquet
```
or with partition auto-detection:
```
path_specs:
- include: s3://test-bucket/{table}/
```

One can also use `include: s3://test-bucket/{table}/*/*/*.parquet` here however above format is preferred as it allows declaring partitions explicitly.
Expand All @@ -99,11 +119,15 @@ test-bucket
Path specs config to ingest folder `orders` as dataset but not folder `tmp_orders`:
```
path_specs:
- include: s3://test-bucket/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
- include: s3://test-bucket/{table}/{partition_key[0]}={partition_value[0]}/{partition_key[1]}={partition_value[1]}/*.parquet
exclude:
- **/tmp_orders/**
```

or with partition auto-detection:
```
path_specs:
- include: s3://test-bucket/{table}/
```

#### Example 5 - Advanced - Either Individual file OR Folder of files as Dataset

Expand Down Expand Up @@ -150,6 +174,7 @@ Above config has 3 path_specs and will ingest following datasets
s3://my-bucket/foo/tests/bar.avro # single file table
s3://my-bucket/foo/tests/*.* # mulitple file level tables
s3://my-bucket/foo/tests/{table}/*.avro #table without partition
s3://my-bucket/foo/tests/{table}/ #table with partition autodetection. Partition only can be detected if it is in the format of key=value
s3://my-bucket/foo/tests/{table}/*/*.avro #table where partitions are not specified
s3://my-bucket/foo/tests/{table}/*.* # table where no partitions as well as data type specified
s3://my-bucket/{dept}/tests/{table}/*.avro # specifying keywords to be used in display name
Expand Down
27 changes: 22 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/aws/aws_common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

import boto3
Expand Down Expand Up @@ -73,6 +74,8 @@ class AwsConnectionConfig(ConfigModel):
- dbt source
"""

_credentials_expiration: Optional[datetime] = None

aws_access_key_id: Optional[str] = Field(
default=None,
description=f"AWS access key ID. {AUTODETECT_CREDENTIALS_DOC_LINK}",
Expand Down Expand Up @@ -115,6 +118,11 @@ class AwsConnectionConfig(ConfigModel):
description="Advanced AWS configuration options. These are passed directly to [botocore.config.Config](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html).",
)

def allowed_cred_refresh(self) -> bool:
if self._normalized_aws_roles():
return True
return False

def _normalized_aws_roles(self) -> List[AwsAssumeRoleConfig]:
if not self.aws_role:
return []
Expand Down Expand Up @@ -153,11 +161,14 @@ def get_session(self) -> Session:
}

for role in self._normalized_aws_roles():
credentials = assume_role(
role,
self.aws_region,
credentials=credentials,
)
if self._should_refresh_credentials():
credentials = assume_role(
role,
self.aws_region,
credentials=credentials,
)
if isinstance(credentials["Expiration"], datetime):
self._credentials_expiration = credentials["Expiration"]

session = Session(
aws_access_key_id=credentials["AccessKeyId"],
Expand All @@ -168,6 +179,12 @@ def get_session(self) -> Session:

return session

def _should_refresh_credentials(self) -> bool:
if self._credentials_expiration is None:
return True
remaining_time = self._credentials_expiration - datetime.now(timezone.utc)
return remaining_time < timedelta(minutes=5)

def get_credentials(self) -> Dict[str, Optional[str]]:
credentials = self.get_session().get_credentials()
if credentials is not None:
Expand Down
20 changes: 18 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/aws/sagemaker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from collections import defaultdict
from typing import DefaultDict, Dict, Iterable, List, Optional
from typing import TYPE_CHECKING, DefaultDict, Dict, Iterable, List, Optional

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
Expand Down Expand Up @@ -33,6 +33,9 @@
StatefulIngestionSourceBase,
)

if TYPE_CHECKING:
from mypy_boto3_sagemaker import SageMakerClient


@platform_name("SageMaker")
@config_class(SagemakerSourceConfig)
Expand All @@ -56,6 +59,7 @@ def __init__(self, config: SagemakerSourceConfig, ctx: PipelineContext):
self.report = SagemakerSourceReport()
self.sagemaker_client = config.sagemaker_client
self.env = config.env
self.client_factory = ClientFactory(config)

@classmethod
def create(cls, config_dict, ctx):
Expand Down Expand Up @@ -92,7 +96,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
# extract jobs if specified
if self.source_config.extract_jobs is not False:
job_processor = JobProcessor(
sagemaker_client=self.sagemaker_client,
sagemaker_client=self.client_factory.get_client,
env=self.env,
report=self.report,
job_type_filter=self.source_config.extract_jobs,
Expand All @@ -118,3 +122,15 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

def get_report(self):
return self.report


class ClientFactory:
def __init__(self, config: SagemakerSourceConfig):
self.config = config
self._cached_client = self.config.sagemaker_client

def get_client(self) -> "SageMakerClient":
if self.config.allowed_cred_refresh():
# Always fetch the client dynamically with auto-refresh logic
return self.config.sagemaker_client
return self._cached_client
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import (
TYPE_CHECKING,
Any,
Callable,
DefaultDict,
Dict,
Iterable,
Expand Down Expand Up @@ -147,7 +148,7 @@ class JobProcessor:
"""

# boto3 SageMaker client
sagemaker_client: "SageMakerClient"
sagemaker_client: Callable[[], "SageMakerClient"]
env: str
report: SagemakerSourceReport
# config filter for specific job types to ingest (see metadata-ingestion README)
Expand All @@ -170,8 +171,7 @@ class JobProcessor:

def get_jobs(self, job_type: JobType, job_spec: JobInfo) -> List[Any]:
jobs = []

paginator = self.sagemaker_client.get_paginator(job_spec.list_command)
paginator = self.sagemaker_client().get_paginator(job_spec.list_command)
for page in paginator.paginate():
page_jobs: List[Any] = page[job_spec.list_key]

Expand Down Expand Up @@ -269,7 +269,7 @@ def get_job_details(self, job_name: str, job_type: JobType) -> Dict[str, Any]:
describe_command = job_type_to_info[job_type].describe_command
describe_name_key = job_type_to_info[job_type].describe_name_key

return getattr(self.sagemaker_client, describe_command)(
return getattr(self.sagemaker_client(), describe_command)(
**{describe_name_key: job_name}
)

Expand Down
Loading

0 comments on commit 6355683

Please sign in to comment.