Skip to content

Commit

Permalink
fix(ingest/powerbi): patch lineage for powerbi report (datahub-projec…
Browse files Browse the repository at this point in the history
  • Loading branch information
dushayntAW authored Apr 12, 2024
1 parent 8ed87d6 commit 5497393
Show file tree
Hide file tree
Showing 16 changed files with 342 additions and 512 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
from typing import Iterable, Optional
from typing import Iterable, Optional, Union

from pydantic.fields import Field

from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import datahub_guid, set_aspect
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
ChartInfoClass,
DashboardInfoClass,
FineGrainedLineageClass,
MetadataChangeEventClass,
SystemMetadataClass,
UpstreamLineageClass,
)
from datahub.specific.chart import ChartPatchBuilder
from datahub.specific.dashboard import DashboardPatchBuilder
from datahub.specific.dataset import DatasetPatchBuilder


Expand All @@ -28,6 +32,62 @@ def convert_upstream_lineage_to_patch(
return MetadataWorkUnit(id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp)


def convert_chart_info_to_patch(
urn: str, aspect: ChartInfoClass, system_metadata: Optional[SystemMetadataClass]
) -> Union[MetadataWorkUnit, None]:
patch_builder = ChartPatchBuilder(urn, system_metadata)

if aspect.customProperties:
for key in aspect.customProperties:
patch_builder.add_custom_property(
key, str(aspect.customProperties.get(key))
)

if aspect.inputEdges:
for inputEdge in aspect.inputEdges:
patch_builder.add_input_edge(inputEdge)

values = patch_builder.build()

if values:
mcp = next(iter(values))
return MetadataWorkUnit(
id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp
)
else:
return None


def convert_dashboard_info_to_patch(
urn: str, aspect: DashboardInfoClass, system_metadata: Optional[SystemMetadataClass]
) -> Union[MetadataWorkUnit, None]:
patch_builder = DashboardPatchBuilder(urn, system_metadata)

if aspect.customProperties:
for key in aspect.customProperties:
patch_builder.add_custom_property(
key, str(aspect.customProperties.get(key))
)

if aspect.datasetEdges:
for datasetEdge in aspect.datasetEdges:
patch_builder.add_dataset_edge(datasetEdge)

if aspect.chartEdges:
for chartEdge in aspect.chartEdges:
patch_builder.add_chart_edge(chartEdge)

values = patch_builder.build()

if values:
mcp = next(iter(values))
return MetadataWorkUnit(
id=MetadataWorkUnit.generate_workunit_id(mcp), mcp_raw=mcp
)
else:
return None


def get_fine_grained_lineage_key(fine_upstream: FineGrainedLineageClass) -> str:
return datahub_guid(
{
Expand Down
28 changes: 25 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@
from datahub.metadata.urns import ChartUrn
from datahub.sql_parsing.sqlglot_lineage import ColumnLineageInfo
from datahub.utilities.dedup_list import deduplicate_list
from src.datahub.ingestion.api.incremental_lineage_helper import (
convert_dashboard_info_to_patch,
)

# Logger instance
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -1300,17 +1303,36 @@ def get_workspace_workunit(
# Convert PowerBi Dashboard and child entities to Datahub work unit to ingest into Datahub
workunits = self.mapper.to_datahub_work_units(dashboard, workspace)
for workunit in workunits:
# Return workunit to Datahub Ingestion framework
yield workunit
wu = self._get_dashboard_patch_work_unit(workunit)
if wu is not None:
yield wu

for report in workspace.reports:
for work_unit in self.mapper.report_to_datahub_work_units(
report, workspace
):
yield work_unit
wu = self._get_dashboard_patch_work_unit(work_unit)
if wu is not None:
yield wu

yield from self.extract_independent_datasets(workspace)

def _get_dashboard_patch_work_unit(
self, work_unit: MetadataWorkUnit
) -> MetadataWorkUnit:
dashboard_info_aspect: Optional[
DashboardInfoClass
] = work_unit.get_aspect_of_type(DashboardInfoClass)

if dashboard_info_aspect:
return convert_dashboard_info_to_patch(
work_unit.get_urn(),
dashboard_info_aspect,
work_unit.metadata.systemMetadata,
)
else:
return work_unit

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
# As modified_workspaces is not idempotent, hence workunit processors are run later for each workspace_id
# This will result in creating checkpoint for each workspace_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,34 +294,26 @@
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)",
"changeType": "UPSERT",
"changeType": "PATCH",
"aspectName": "dashboardInfo",
"aspect": {
"json": {
"customProperties": {
"chartCount": "2",
"workspaceName": "demo-workspace",
"workspaceId": "64ED5CAD-7C10-4684-8180-826122881108"
"json": [
{
"op": "add",
"path": "/customProperties/chartCount",
"value": "2"
},
"title": "test_dashboard",
"description": "Description of test dashboard",
"charts": [
"urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)",
"urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)"
],
"datasets": [],
"lastModified": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
{
"op": "add",
"path": "/customProperties/workspaceName",
"value": "demo-workspace"
},
"dashboardUrl": "https://localhost/dashboards/web/1"
}
{
"op": "add",
"path": "/customProperties/workspaceId",
"value": "64ED5CAD-7C10-4684-8180-826122881108"
}
]
},
"systemMetadata": {
"lastObserved": 1643871600000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1208,34 +1208,26 @@
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)",
"changeType": "UPSERT",
"changeType": "PATCH",
"aspectName": "dashboardInfo",
"aspect": {
"json": {
"customProperties": {
"chartCount": "2",
"workspaceName": "demo-workspace",
"workspaceId": "64ED5CAD-7C10-4684-8180-826122881108"
"json": [
{
"op": "add",
"path": "/customProperties/chartCount",
"value": "2"
},
"title": "test_dashboard",
"description": "",
"charts": [
"urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)",
"urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)"
],
"datasets": [],
"lastModified": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
{
"op": "add",
"path": "/customProperties/workspaceName",
"value": "demo-workspace"
},
"dashboardUrl": "https://localhost/dashboards/web/1"
}
{
"op": "add",
"path": "/customProperties/workspaceId",
"value": "64ED5CAD-7C10-4684-8180-826122881108"
}
]
},
"systemMetadata": {
"lastObserved": 1643871600000,
Expand Down Expand Up @@ -1959,37 +1951,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(powerbi,reports.5b218778-e7a5-4d73-8187-f10824047715)",
"changeType": "UPSERT",
"aspectName": "dashboardInfo",
"aspect": {
"json": {
"customProperties": {},
"title": "SalesMarketing",
"description": "Acryl sales marketing report",
"charts": [],
"datasets": [],
"lastModified": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
},
"dashboardUrl": "https://app.powerbi.com/groups/f089354e-8366-4e18-aea3-4cb4a3a50b48/reports/5b218778-e7a5-4d73-8187-f10824047715"
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(powerbi,reports.5b218778-e7a5-4d73-8187-f10824047715)",
Expand Down
40 changes: 16 additions & 24 deletions metadata-ingestion/tests/integration/powerbi/golden_test_cll.json
Original file line number Diff line number Diff line change
Expand Up @@ -1233,34 +1233,26 @@
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(powerbi,dashboards.7D668CAD-7FFC-4505-9215-655BCA5BEBAE)",
"changeType": "UPSERT",
"changeType": "PATCH",
"aspectName": "dashboardInfo",
"aspect": {
"json": {
"customProperties": {
"chartCount": "2",
"workspaceName": "demo-workspace",
"workspaceId": "64ED5CAD-7C10-4684-8180-826122881108"
"json": [
{
"op": "add",
"path": "/customProperties/chartCount",
"value": "2"
},
"title": "test_dashboard",
"description": "Description of test dashboard",
"charts": [
"urn:li:chart:(powerbi,charts.B8E293DC-0C83-4AA0-9BB9-0A8738DF24A0)",
"urn:li:chart:(powerbi,charts.23212598-23b5-4980-87cc-5fc0ecd84385)"
],
"datasets": [],
"lastModified": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
{
"op": "add",
"path": "/customProperties/workspaceName",
"value": "demo-workspace"
},
"dashboardUrl": "https://localhost/dashboards/web/1"
}
{
"op": "add",
"path": "/customProperties/workspaceId",
"value": "64ED5CAD-7C10-4684-8180-826122881108"
}
]
},
"systemMetadata": {
"lastObserved": 1643871600000,
Expand Down
Loading

0 comments on commit 5497393

Please sign in to comment.