diff --git a/metadata-ingestion/examples/structured_properties/list_structured_properties.py b/metadata-ingestion/examples/structured_properties/list_structured_properties.py new file mode 100644 index 0000000000000..66ac90c1228a3 --- /dev/null +++ b/metadata-ingestion/examples/structured_properties/list_structured_properties.py @@ -0,0 +1,12 @@ +# Usage: python3 list_structured_properties.py +# Expected Output: List of structured properties +# This script lists all structured properties in DataHub +from datahub.api.entities.structuredproperties.structuredproperties import ( + StructuredProperties, +) +from datahub.ingestion.graph.client import get_default_graph + +with get_default_graph() as graph: + structuredproperties = StructuredProperties.list(graph) + for structuredproperty in structuredproperties: + print(structuredproperty.dict()) diff --git a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py index 619f69b016262..179dbdb231c91 100644 --- a/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py +++ b/metadata-ingestion/src/datahub/api/entities/structuredproperties/structuredproperties.py @@ -1,7 +1,7 @@ import logging from enum import Enum from pathlib import Path -from typing import List, Optional +from typing import Iterable, List, Optional import yaml from pydantic import validator @@ -226,3 +226,14 @@ def to_yaml( yaml.indent(mapping=2, sequence=4, offset=2) yaml.default_flow_style = False yaml.dump(self.dict(), fp) + + @staticmethod + def list_urns(graph: DataHubGraph) -> Iterable[str]: + return graph.get_urns_by_filter( + entity_types=["structuredProperty"], + ) + + @staticmethod + def list(graph: DataHubGraph) -> Iterable["StructuredProperties"]: + for urn in StructuredProperties.list_urns(graph): + yield StructuredProperties.from_datahub(graph, urn) diff --git a/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py b/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py index 42285cf13a5dd..5cd28516a076d 100644 --- a/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py +++ b/metadata-ingestion/src/datahub/cli/specific/structuredproperties_cli.py @@ -1,9 +1,11 @@ import json import logging from pathlib import Path +from typing import Iterable import click from click_default_group import DefaultGroup +from ruamel.yaml import YAML from datahub.api.entities.structuredproperties.structuredproperties import ( StructuredProperties, @@ -61,3 +63,85 @@ def get(urn: str, to_file: str) -> None: ) else: click.secho(f"Structured property {urn} does not exist") + + +@properties.command( + name="list", +) +@click.option("--details/--no-details", is_flag=True, default=True) +@click.option("--to-file", required=False, type=str) +@telemetry.with_telemetry() +def list(details: bool, to_file: str) -> None: + """List structured properties in DataHub""" + + def to_yaml_list( + objects: Iterable[StructuredProperties], # iterable of objects to dump + file: Path, + ) -> None: + # if file exists, first we read it + yaml = YAML(typ="rt") # default, if not specfied, is 'rt' (round-trip) + yaml.indent(mapping=2, sequence=4, offset=2) + yaml.default_flow_style = False + serialized_objects = [] + if file.exists(): + with open(file, "r") as fp: + existing_objects = yaml.load(fp) # this is a list of dicts + existing_objects = [ + StructuredProperties.parse_obj(obj) for obj in existing_objects + ] + objects = [obj for obj in objects] + # do a positional update of the existing objects + existing_urns = {obj.urn for obj in existing_objects} + # existing_urns = {obj["urn"] if "urn" in obj else f"urn:li:structuredProperty:{obj['id']}" for obj in existing_objects} + for i, obj in enumerate(existing_objects): + # existing_urn = obj["urn"] if "urn" in obj else f"urn:li:structuredProperty:{obj['id']}" + existing_urn = obj.urn + # breakpoint() + if existing_urn in {obj.urn for obj in objects}: + existing_objects[i] = next( + obj.dict(exclude_unset=True, exclude_none=True) + for obj in objects + if obj.urn == existing_urn + ) + new_objects = [ + obj.dict(exclude_unset=True, exclude_none=True) + for obj in objects + if obj.urn not in existing_urns + ] + serialized_objects = existing_objects + new_objects + else: + serialized_objects = [ + obj.dict(exclude_unset=True, exclude_none=True) for obj in objects + ] + + with open(file, "w") as fp: + yaml.dump(serialized_objects, fp) + + with get_default_graph() as graph: + if details: + logger.info( + "Listing structured properties with details. Use --no-details for urns only" + ) + structuredproperties = StructuredProperties.list(graph) + if to_file: + to_yaml_list(structuredproperties, Path(to_file)) + else: + for structuredproperty in structuredproperties: + click.secho( + f"{json.dumps(structuredproperty.dict(exclude_unset=True, exclude_none=True), indent=2)}" + ) + else: + logger.info( + "Listing structured property urns only, use --details for more information" + ) + structured_property_urns = StructuredProperties.list_urns(graph) + if to_file: + with open(to_file, "w") as f: + for urn in structured_property_urns: + f.write(f"{urn}\n") + click.secho( + f"Structured property urns written to {to_file}", fg="green" + ) + else: + for urn in structured_property_urns: + click.secho(f"{urn}")