Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Jul 3, 2024
2 parents 01e20a5 + 581dc7f commit 99a38b7
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 63 deletions.
4 changes: 3 additions & 1 deletion datahub-web-react/src/app/ingest/source/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ export const getExecutionRequestStatusDisplayColor = (status: string) => {
export const validateURL = (fieldName: string) => {
return {
validator(_, value) {
const URLPattern = new RegExp(/^(?:http(s)?:\/\/)?[\w.-]+(?:\.[a-zA-Z0-9.-]{2,})+[\w\-._~:/?#[\]@!$&'()*+,;=.]+$/);
const URLPattern = new RegExp(
/^(?:http(s)?:\/\/)?[\w.-]+(?:\.[a-zA-Z0-9.-]{2,})+[\w\-._~:/?#[\]@!$&'()*+,;=.]+$/,
);
const isURLValid = URLPattern.test(value);
if (!value || isURLValid) {
return Promise.resolve();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export default function PolicyDetailsModal({ policy, visible, onClose, privilege

const resources = convertLegacyResourceFilter(policy?.resources);
const resourceTypes = getFieldValues(resources?.filter, 'TYPE') || [];
const dataPlatformInstances = getFieldValues(resources?.filter, 'DATA_PLATFORM_INSTANCE') || [];
const resourceEntities = getFieldValues(resources?.filter, 'URN') || [];
const domains = getFieldValues(resources?.filter, 'DOMAIN') || [];

Expand Down Expand Up @@ -170,6 +171,20 @@ export default function PolicyDetailsModal({ policy, visible, onClose, privilege
);
})) || <PoliciesTag>All</PoliciesTag>}
</div>
{dataPlatformInstances?.length > 0 && (
<div>
<Typography.Title level={5}>Data Platform Instances</Typography.Title>
<ThinDivider />
{dataPlatformInstances.map((value, key) => {
return (
// eslint-disable-next-line react/no-array-index-key
<PoliciesTag key={`dataPlatformInstance-${value.value}-${key}`}>
<Typography.Text>{getDisplayName(value.entity)}</Typography.Text>
</PoliciesTag>
);
})}
</div>
)}
<div>
<Typography.Title level={5}>Domains</Typography.Title>
<ThinDivider />
Expand Down
111 changes: 111 additions & 0 deletions docs/advanced/patch.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,114 @@ Here are a few illustrative examples using the Python Patch builders:
</TabItem>
</Tabs>


## How Patch works

To understand how patching works, it's important to understand a bit about our [models](../what/aspect.md). Entities are comprised of Aspects
which can be reasoned about as JSON representations of the object models. To be able to patch these we utilize [JsonPatch](https://jsonpatch.com/). The components of a JSON Patch are the path, operation, and value.

### Path

The JSON path refers to a value within the schema. This can be a single field or can be an entire object reference depending on what the path is.
For our patches we are primarily targeting single fields or even single array elements within a field. To be able to target array elements by id, we go through a translation process
of the schema to transform arrays into maps. This allows a path to reference a particular array element by key rather than by index, for example a specific tag urn being added to a dataset.
This is important to note that for some fields in our schema that are arrays which do not necessarily restrict uniqueness, this puts a uniqueness constraint on the key.
The key for objects stored in arrays is determined manually by examining the schema and a long term goal is to make these keys annotation driven to reduce the amount of code needed to support
additional aspects to be patched. There is a generic patch endpoint, but it requires any array field keys to be specified at request time, putting a lot of burden on the API user.

#### Examples

A patch path for targeting an upstream dataset:

`/upstreams/urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)`

Breakdown:
* `/upstreams` -> References the upstreams field of the UpstreamLineage aspect, this is an array of Upstream objects where the key is the Urn
* `/urn:...` -> The dataset to be targeted by the operation


A patch path for targeting a fine grained lineage upstream:

`/fineGrainedLineages/TRANSFORM/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),foo)/urn:li:query:queryId/urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD),bar)`

Breakdown:
* `/fineGrainedLineages` -> References the fineGrainedLineages field on an UpstreamLineage, this is an array of FineGrainedLineage objects keyed on transformOperation, downstream urn, and query urn
* `/TRANSFORM` -> transformOperation, one of the fields determining the key for a fineGrainedLineage
* `/urn:li:schemaField:...` -> The downstream schemaField referenced in this schema, part of the key for a fineGrainedLineage
* `/urn:li:query:...` -> The query urn this relationship was derived from, part of the key for a fineGrainedLineage
* `/urn:li:schemaField:` -> The upstream urn that is being targeted by this patch operation

This showcases that in some cases the key for objects is simple, in others in can be complex to determine, but for our fully supported use cases we have
SDK support on both the Java and Python side that will generate these patches for you as long as you supply the required method parameters.
Path is generally the most complicated portion of a patch to reason about as it requires intimate knowledge of the schema and models.

### Operation

Operation is a limited enum of a few supported types pulled directly from the JSON Patch spec. DataHub only supports `ADD` and `REMOVE` of these options
as the other patch operations do not currently have a use case within our system.

#### Add

Add is a bit of a misnomer for the JSON Patch spec, it is not an explicit add but an upsert/replace. If the path specified does not exist, it will be created,
but if the path already exists the value will be replaced. Patch operations apply at a path level so it is possible to do full replaces of arrays or objects in the schema
using adds, but generally the most useful use case for patch is to add elements to arrays without affecting the other elements as full upserts are supported by standard ingestion.

#### Remove

Remove operations require the path specified to be present, or an error will be thrown, otherwise they operate as one would expect. The specified path will be removed from the aspect.


### Value

Value is the actual information that will be stored at a path. If the path references an object then this will include the JSON key value pairs for that object.

#### Examples

An example UpstreamLineage object value:

```json
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,my-bucket/my-folder/my-file.txt,PROD)",
"type": "TRANSFORMED"
}
```

For the previous path example (`/upstreams/urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)`), this object would represent the UpstreamLineage object for that path.
This specifies the required fields to properly represent that object. Note: by modifying this path, you could reference a single field within the UpstreamLineage object itself, like so:

```json
{
"path": "/upstreams/urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)/type",
"op": "ADD",
"value": "VIEW"
}
```

### Implementation details

#### Template Classes

Template classes are the mechanism that maps fields to their corresponding JSON paths. Since DataMaps are not true JSON, first we convert a RecordTemplate to a JSON String,
perform any additional process to map array fields to their keys, apply the patch, and then convert the JSON object back to a RecordTemplate to work with the rest of the application.

The template classes we currently support can be found in the `entity-registry` module. They are split up by aspect, with the GenericTemplate applying to any non-directly supported aspects.
The GenericTemplate allows for use cases that we have not gotten around to directly support yet, but puts more burden on the user to generate patches correctly.

The template classes are utilized in `EntityServiceImpl` where a MCP is determined to be either a patch or standard upsert which then routes through to the stored templates registered on the EntityRegistry.
The core logical flow each Template runs through is set up in the `Template` interface, with some more specific logic in the lower level interfaces for constructing/deconstructing array field keys.
Most of the complexity around these classes is knowledge of schema and JSON path traversals.

##### ArrayMergingTemplate & CompoundKeyTemplate

`ArrayMergingTemplate` is utilized for any aspect which has array fields and may either be used directly or use `CompoundKeyTemplate`. `ArrayMergingTemplate` is the simpler one that can only be used directly for
single value keys. `CompoundKeyTemplate` allows for support of multi-field keys. For more complex examples like FineGrainedLineage, further logic is needed to construct a key as it is not generalizable to other aspects, see `UpstreamLineageTemplate` for full special case implementation.

#### PatchBuilders

There are patch builder SDK classes for constructing patches in both Java and Python. The Java patch builders all extend `AbstractMultiFieldPatchBuilder` which sets up the
base functionality for patch builder subtypes. Each implementation of this abstract class is targeted at a particular aspect and contains specific field based update methods
for the most common use cases. On the Python side patch builders live in the `src/specific/` directory and are organized by entity type.
36 changes: 20 additions & 16 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from collections import defaultdict
from dataclasses import dataclass, field as dataclass_field
from functools import lru_cache
from typing import (
Any,
DefaultDict,
Expand Down Expand Up @@ -118,9 +119,10 @@ class GlueSourceConfig(
description=f"The platform to use for the dataset URNs. Must be one of {VALID_PLATFORMS}.",
)

# https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-glue-table-tableinput.html#cfn-glue-table-tableinput-owner
extract_owners: Optional[bool] = Field(
default=True,
description="When enabled, extracts ownership from Glue directly and overwrites existing owners. When disabled, ownership is left empty for datasets.",
description="When enabled, extracts ownership from Glue table property and overwrites existing owners (DATAOWNER). When disabled, ownership is left empty for datasets. Expects a corpGroup urn, a corpuser urn or only the identifier part for the latter. Not used in the normal course of AWS Glue operations.",
)
extract_transforms: Optional[bool] = Field(
default=True, description="Whether to extract Glue transform jobs."
Expand Down Expand Up @@ -1061,20 +1063,6 @@ def _extract_record(
f"extract record from table={table_name} for dataset={dataset_urn}"
)

def get_owner() -> Optional[OwnershipClass]:
owner = table.get("Owner")
if owner:
owners = [
OwnerClass(
owner=f"urn:li:corpuser:{owner}",
type=OwnershipTypeClass.DATAOWNER,
)
]
return OwnershipClass(
owners=owners,
)
return None

def get_dataset_properties() -> DatasetPropertiesClass:
return DatasetPropertiesClass(
description=table.get("Description"),
Expand Down Expand Up @@ -1284,6 +1272,20 @@ def get_data_platform_instance() -> DataPlatformInstanceClass:
else None,
)

@lru_cache(maxsize=None)
def _get_ownership(owner: str) -> Optional[OwnershipClass]:
if owner:
owners = [
OwnerClass(
owner=mce_builder.make_user_urn(owner),
type=OwnershipTypeClass.DATAOWNER,
)
]
return OwnershipClass(
owners=owners,
)
return None

dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
aspects=[
Expand All @@ -1298,8 +1300,10 @@ def get_data_platform_instance() -> DataPlatformInstanceClass:

dataset_snapshot.aspects.append(get_data_platform_instance())

# Ownership
if self.extract_owners:
optional_owner_aspect = get_owner()
owner = table.get("Owner")
optional_owner_aspect = _get_ownership(owner)
if optional_owner_aspect is not None:
dataset_snapshot.aspects.append(optional_owner_aspect)

Expand Down
58 changes: 14 additions & 44 deletions metadata-ingestion/src/datahub/specific/datajob.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,45 +378,29 @@ def set_output_datasets(self, outputs: List[Edge]) -> "DataJobPatchBuilder":
)
return self

def add_input_dataset_field(
self, input: Union[Edge, Urn, str]
) -> "DataJobPatchBuilder":
def add_input_dataset_field(self, input: Union[Urn, str]) -> "DataJobPatchBuilder":
"""
Adds an input dataset field to the DataJobPatchBuilder.
Args:
input: The input dataset field, which can be an Edge object, Urn object, or a string.
input: The input dataset field, which can be an Urn object, or a string.
Returns:
The DataJobPatchBuilder instance.
Raises:
ValueError: If the input is not a Schema Field urn.
Notes:
If `input` is an Edge object, it is used directly. If `input` is a Urn object or string,
it is converted to an Edge object and added with default audit stamps.
"""
if isinstance(input, Edge):
input_urn: str = input.destinationUrn
input_edge: Edge = input
elif isinstance(input, (Urn, str)):
input_urn = str(input)
if not input_urn.startswith("urn:li:schemaField:"):
raise ValueError(f"Input {input} is not a Schema Field urn")

input_edge = Edge(
destinationUrn=input_urn,
created=self._mint_auditstamp(),
lastModified=self._mint_auditstamp(),
)
input_urn = str(input)
urn = Urn.create_from_string(input_urn)
if not urn.get_type() == "schemaField":
raise ValueError(f"Input {input} is not a Schema Field urn")

self._ensure_urn_type("schemaField", [input_edge], "add_input_dataset_field")
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
path=f"/inputDatasetFields/{self.quote(input_urn)}",
value=input_edge,
value={},
)
return self

Expand Down Expand Up @@ -467,44 +451,30 @@ def set_input_dataset_fields(self, inputs: List[Edge]) -> "DataJobPatchBuilder":
return self

def add_output_dataset_field(
self, output: Union[Edge, Urn, str]
self, output: Union[Urn, str]
) -> "DataJobPatchBuilder":
"""
Adds an output dataset field to the DataJobPatchBuilder.
Args:
output: The output dataset field, which can be an Edge object, Urn object, or a string.
output: The output dataset field, which can be an Urn object, or a string.
Returns:
The DataJobPatchBuilder instance.
Raises:
ValueError: If the output is not a Schema Field urn.
Notes:
If `output` is an Edge object, it is used directly. If `output` is a Urn object or string,
it is converted to an Edge object and added with default audit stamps.
"""
if isinstance(output, Edge):
output_urn: str = output.destinationUrn
output_edge: Edge = output
elif isinstance(output, (Urn, str)):
output_urn = str(output)
if not output_urn.startswith("urn:li:schemaField:"):
raise ValueError(f"Input {input} is not a Schema Field urn")

output_edge = Edge(
destinationUrn=output_urn,
created=self._mint_auditstamp(),
lastModified=self._mint_auditstamp(),
)
output_urn = str(output)
urn = Urn.create_from_string(output_urn)
if not urn.get_type() == "schemaField":
raise ValueError(f"Input {output} is not a Schema Field urn")

self._ensure_urn_type("schemaField", [output_edge], "add_output_dataset_field")
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
path=f"/outputDatasetFields/{self.quote(output_urn)}",
value=output_edge,
value={},
)
return self

Expand Down
12 changes: 11 additions & 1 deletion metadata-integration/java/custom-plugin-lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ configurations.all {
exclude group: 'org.antlr'
}

task sourcesJar(type: Jar) {
archiveClassifier = 'sources'
from sourceSets.main.allSource
}

task javadocJar(type: Jar) {
archiveClassifier = 'javadoc'
from javadoc
}

shadowJar {
zip64 = true
archiveClassifier = ''
Expand All @@ -59,7 +69,7 @@ publishing {
artifactId = 'datahub-custom-plugin-lib'
description = 'DataHub Java Custom Plugin dependencies'
url = 'https://datahubproject.io'
artifacts = [shadowJar]
artifacts = [shadowJar, javadocJar, sourcesJar]

scm {
connection = 'scm:git:git://github.com/datahub-project/datahub.git'
Expand Down
2 changes: 1 addition & 1 deletion smoke-test/tests/dataproduct/test_dataproduct.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def create_test_data(filename: str):

@pytest.fixture(scope="module", autouse=False)
def ingest_cleanup_data(request):
new_file, filename = tempfile.mkstemp()
new_file, filename = tempfile.mkstemp(suffix=".json")
try:
create_test_data(filename)
print("ingesting data products test data")
Expand Down

0 comments on commit 99a38b7

Please sign in to comment.