diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cc50505..69bfd67 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,9 +19,6 @@ jobs: - name: Setup run: | just setup - - name: Lint - run: | - just lint - name: Test run: | - just test + just lint test diff --git a/impact_analysis.md b/impact_analysis.md new file mode 100644 index 0000000..902792b --- /dev/null +++ b/impact_analysis.md @@ -0,0 +1,5 @@ +## Acryl Impact Analysis + +Failed to run impact analysis: 'DATAHUB_GMS_HOST' + +See the logs for full details. diff --git a/justfile b/justfile index 4b76b87..2886671 100644 --- a/justfile +++ b/justfile @@ -1,6 +1,6 @@ test: lint - # TODO + venv/bin/pytest tests setup: # Create venv. diff --git a/src/impact_analysis.py b/src/impact_analysis.py index 03590fc..a73bf5e 100644 --- a/src/impact_analysis.py +++ b/src/impact_analysis.py @@ -1,4 +1,5 @@ import json +import logging import os import pathlib import subprocess @@ -8,18 +9,24 @@ from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph from datahub.metadata.schema_classes import DatasetPropertiesClass from datahub.telemetry import telemetry -from datahub.utilities.urns.urn import Urn, guess_entity_type -DATAHUB_SERVER = os.environ["DATAHUB_GMS_HOST"] -DATAHUB_TOKEN: Optional[str] = os.getenv("DATAHUB_GMS_TOKEN") -DATAHUB_FRONTEND_URL = os.environ["DATAHUB_FRONTEND_URL"] +from rendering import datahub_url_from_urn, format_entity OUTPUT_PATH = pathlib.Path("impact_analysis.md") DBT_ID_PROP = "dbt_unique_id" MAX_IMPACTED_DOWNSTREAMS = 30 MAX_DOWNSTREAMS_TO_FETCH = 1000 -graph = DataHubGraph(DatahubClientConfig(server=DATAHUB_SERVER, token=DATAHUB_TOKEN)) + +def get_graph() -> DataHubGraph: + DATAHUB_SERVER = os.environ["DATAHUB_GMS_HOST"] + DATAHUB_TOKEN: Optional[str] = os.getenv("DATAHUB_GMS_TOKEN") + + graph = DataHubGraph( + DatahubClientConfig(server=DATAHUB_SERVER, token=DATAHUB_TOKEN) + ) + + return graph class ImpactAnalysisError(Exception): @@ -81,7 +88,7 @@ def determine_changed_dbt_models() -> List[DbtNodeInfo]: raise ImpactAnalysisError("Failed to parse dbt output") from e -def find_datahub_urns(dbt_node_ids: List[str]) -> List[str]: +def find_datahub_urns(graph: DataHubGraph, dbt_node_ids: List[str]) -> List[str]: if not dbt_node_ids: return [] @@ -108,7 +115,7 @@ def find_datahub_urns(dbt_node_ids: List[str]) -> List[str]: return urns -def get_datahub_info(urn: str) -> Optional[DatasetPropertiesClass]: +def get_datahub_info(graph: DataHubGraph, urn: str) -> Optional[DatasetPropertiesClass]: return graph.get_aspect(urn, DatasetPropertiesClass) @@ -172,7 +179,7 @@ def get_datahub_info(urn: str) -> Optional[DatasetPropertiesClass]: """ -def get_impact_analysis(urn: str): +def get_impact_analysis(graph: DataHubGraph, urn: str): result = graph.execute_graphql( IMPACT_ANALYSIS_QUERY, variables={ @@ -198,36 +205,11 @@ def get_impact_analysis(urn: str): return downstream_details -def datahub_url_from_urn(urn: str, suffix: Optional[str] = None) -> str: - entity_type = guess_entity_type(urn) - if entity_type == "dataJob": - entity_type = "tasks" - elif entity_type == "dataFlow": - entity_type = "pipelines" - - url = f"{DATAHUB_FRONTEND_URL}/{entity_type}/{Urn.url_encode(urn)}" - if suffix: - url += f"/{suffix}" - return url - - -def format_entity(downstream: Dict) -> str: - platform = downstream["platform"]["name"] - if downstream["platform"].get("properties", {}).get("displayName"): - platform = downstream["platform"]["properties"]["displayName"] - - name = downstream["properties"]["name"] - url = datahub_url_from_urn(downstream["urn"]) - - type: str = downstream["type"].capitalize() - if downstream.get("subTypes"): - type = downstream["subTypes"]["typeNames"][0] - - return f"{platform} {type} [{name}]({url})" - - @telemetry.with_telemetry() def dbt_impact_analysis() -> str: + graph = get_graph() + DATAHUB_FRONTEND_URL = os.environ["DATAHUB_FRONTEND_URL"] + # Step 1 - determine which dbt nodes are impacted by the changes in a given PR. changed_dbt_nodes = determine_changed_dbt_models() dbt_id_to_dbt_node = {node["unique_id"]: node for node in changed_dbt_nodes} @@ -235,8 +217,8 @@ def dbt_impact_analysis() -> str: # Step 2 - map dbt nodes to datahub urns. # In an ideal world, the datahub urns for dbt would just be the dbt node ids. - urns = find_datahub_urns([node["unique_id"] for node in changed_dbt_nodes]) - datahub_node_props = {urn: get_datahub_info(urn) for urn in urns} + urns = find_datahub_urns(graph, [node["unique_id"] for node in changed_dbt_nodes]) + datahub_node_props = {urn: get_datahub_info(graph, urn) for urn in urns} urn_to_dbt_id = { urn: node.customProperties[DBT_ID_PROP] for urn, node in datahub_node_props.items() @@ -245,7 +227,7 @@ def dbt_impact_analysis() -> str: # print(urn_to_dbt_id) # Step 3 - generate downstream impact analysis for each datahub urn. - downstreams_report = {urn: get_impact_analysis(urn) for urn in urns} + downstreams_report = {urn: get_impact_analysis(graph, urn) for urn in urns} all_impacted_urns = { downstream["urn"] @@ -263,15 +245,13 @@ def dbt_impact_analysis() -> str: for urn, downstreams in downstreams_report.items(): dbt_node = dbt_id_to_dbt_node[urn_to_dbt_id[urn]] - output += ( - f"\n### [{dbt_node['original_file_path']}]({datahub_url_from_urn(urn)})\n\n" - ) + output += f"\n### [{dbt_node['original_file_path']}]({datahub_url_from_urn(DATAHUB_FRONTEND_URL, urn)})\n\n" if downstreams: output += f"May impact **{len(downstreams)}** downstreams:\n" for downstream in downstreams[:MAX_IMPACTED_DOWNSTREAMS]: - output += f"- {format_entity(downstream)}\n" + output += f"- {format_entity(DATAHUB_FRONTEND_URL, downstream)}\n" if len(downstreams) > MAX_IMPACTED_DOWNSTREAMS: - output += f"- ...and [{len(downstreams) - MAX_IMPACTED_DOWNSTREAMS} more]({datahub_url_from_urn(urn, suffix='/Lineage')})\n" + output += f"- ...and [{len(downstreams) - MAX_IMPACTED_DOWNSTREAMS} more]({datahub_url_from_urn(DATAHUB_FRONTEND_URL, urn, suffix='/Lineage')})\n" else: output += "No downstreams impacted.\n" @@ -299,4 +279,8 @@ def main(): if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + logging.getLogger("datahub").setLevel(logging.DEBUG) + logging.getLogger(__name__).setLevel(logging.DEBUG) + main() diff --git a/src/rendering.py b/src/rendering.py new file mode 100644 index 0000000..8f4e793 --- /dev/null +++ b/src/rendering.py @@ -0,0 +1,33 @@ +from typing import Optional + +from datahub.utilities.urns.urn import Urn, guess_entity_type + + +def datahub_url_from_urn( + frontend_base_url: str, urn: str, suffix: Optional[str] = None +) -> str: + entity_type = guess_entity_type(urn) + if entity_type == "dataJob": + entity_type = "tasks" + elif entity_type == "dataFlow": + entity_type = "pipelines" + + url = f"{frontend_base_url}/{entity_type}/{Urn.url_encode(urn)}" + if suffix: + url += f"/{suffix}" + return url + + +def format_entity(frontend_base_url: str, downstream: dict) -> str: + platform = downstream["platform"]["name"] + if downstream["platform"].get("properties", {}).get("displayName"): + platform = downstream["platform"]["properties"]["displayName"] + + name = downstream["properties"]["name"] + url = datahub_url_from_urn(frontend_base_url, downstream["urn"]) + + type: str = downstream["type"].capitalize() + if downstream.get("subTypes"): + type = downstream["subTypes"]["typeNames"][0] + + return f"{platform} {type} [{name}]({url})" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..a4ed64c --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,7 @@ +import sys +import pathlib + +here = pathlib.Path(__file__).parent +src = here.parent / "src" + +sys.path.insert(0, str(src)) diff --git a/tests/test_render.py b/tests/test_render.py new file mode 100644 index 0000000..36c37fe --- /dev/null +++ b/tests/test_render.py @@ -0,0 +1,11 @@ +from rendering import datahub_url_from_urn + + +def test_url_generation(): + assert ( + datahub_url_from_urn( + "https://customer.acryl.io", + "urn:li:dataset:(urn:li:dataPlatform:snowflake,snowflake_sample_data.tpch_sf1000.orders,PROD)", + ) + == "https://customer.acryl.io/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Csnowflake_sample_data.tpch_sf1000.orders%2CPROD%29" + )