Skip to content

Commit

Permalink
Added storage for direct filesystem references in code (#2526)
Browse files Browse the repository at this point in the history
## Changes
On top of linting, collects DFSA records and stores them

### Linked issues
Progresses #2350 

### Functionality
- [x] added a new table `directfs_in_paths`

### Tests
- [x] added unit tests
- [x] updated integration tests
- [x] manually tested schema upgrade:

![Screenshot 2024-09-13 at 16 00
19](https://github.com/user-attachments/assets/1e3d1bcc-a5c8-446a-a074-646f50efd1ba)

---------

Co-authored-by: Eric Vergnaud <[email protected]>
Co-authored-by: Serge Smertin <[email protected]>
Co-authored-by: Andrew Snare <[email protected]>
  • Loading branch information
4 people authored Sep 16, 2024
1 parent cfc6190 commit 60e77e0
Show file tree
Hide file tree
Showing 19 changed files with 627 additions and 119 deletions.
6 changes: 6 additions & 0 deletions src/databricks/labs/ucx/contexts/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from databricks.labs.ucx.recon.metadata_retriever import DatabricksTableMetadataRetriever
from databricks.labs.ucx.recon.migration_recon import MigrationRecon
from databricks.labs.ucx.recon.schema_comparator import StandardSchemaComparator
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccessCrawlers
from databricks.labs.ucx.source_code.python_libraries import PythonLibraryResolver
from databricks.sdk import AccountClient, WorkspaceClient, core
from databricks.sdk.errors import ResourceDoesNotExist
Expand Down Expand Up @@ -425,9 +426,14 @@ def workflow_linter(self):
self.dependency_resolver,
self.path_lookup,
TableMigrationIndex([]), # TODO: bring back self.tables_migrator.index()
self.directfs_access_crawlers,
self.config.include_job_ids,
)

@cached_property
def directfs_access_crawlers(self):
return DirectFsAccessCrawlers(self.sql_backend, self.inventory_database)

@cached_property
def redash(self):
return Redash(
Expand Down
5 changes: 5 additions & 0 deletions src/databricks/labs/ucx/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
from databricks.labs.ucx.installer.workflows import WorkflowsDeployment
from databricks.labs.ucx.recon.migration_recon import ReconResult
from databricks.labs.ucx.runtime import Workflows
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccess
from databricks.labs.ucx.source_code.jobs import JobProblem
from databricks.labs.ucx.workspace_access.base import Permissions
from databricks.labs.ucx.workspace_access.generic import WorkspaceObjectInfo
Expand Down Expand Up @@ -120,6 +121,9 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
functools.partial(table, "udfs", Udf),
functools.partial(table, "logs", LogRecord),
functools.partial(table, "recon_results", ReconResult),
functools.partial(
table, "directfs_in_paths", DirectFsAccess
), # directfs_in_queries will be added in upcoming PR
],
)
deployer.deploy_view("grant_detail", "queries/views/grant_detail.sql")
Expand All @@ -128,6 +132,7 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str):
deployer.deploy_view("misc_patterns", "queries/views/misc_patterns.sql")
deployer.deploy_view("code_patterns", "queries/views/code_patterns.sql")
deployer.deploy_view("reconciliation_results", "queries/views/reconciliation_results.sql")
# direct_file_system_access view will be added in upcoming PR


def extract_major_minor(version_string):
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/labs/ucx/mixins/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ def create(notebook_path: str | Path | None = None, **kwargs):

job = ws.jobs.create(**kwargs)
logger.info(f"Job: {ws.config.host}#job/{job.job_id}")
return job
return ws.jobs.get(job.job_id)

yield from factory("job", create, lambda item: ws.jobs.delete(item.job_id))

Expand Down
1 change: 1 addition & 0 deletions src/databricks/labs/ucx/source_code/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from databricks.sdk.service.workspace import Language

from databricks.labs.blueprint.paths import WorkspacePath

from databricks.labs.ucx.source_code.python.python_ast import Tree

# Code mapping between LSP, PyLint, and our own diagnostics:
Expand Down
141 changes: 141 additions & 0 deletions src/databricks/labs/ucx/source_code/directfs_access.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from __future__ import annotations


import logging
from collections.abc import Sequence, Iterable
from dataclasses import dataclass, field
from datetime import datetime

from databricks.labs.ucx.framework.crawlers import CrawlerBase
from databricks.labs.lsql.backends import SqlBackend
from databricks.sdk.errors import DatabricksError

logger = logging.getLogger(__name__)


@dataclass
class LineageAtom:

object_type: str
object_id: str
other: dict[str, str] | None = None


@dataclass
class DirectFsAccess:
"""A record describing a Direct File System Access"""

UNKNOWN = "unknown"

path: str
is_read: bool
is_write: bool
source_id: str = UNKNOWN
source_timestamp: datetime = datetime.fromtimestamp(0)
source_lineage: list[LineageAtom] = field(default_factory=list)
job_id: int = -1
job_name: str = UNKNOWN
task_key: str = UNKNOWN
assessment_start_timestamp: datetime = datetime.fromtimestamp(0)
assessment_end_timestamp: datetime = datetime.fromtimestamp(0)

def replace_source(
self,
source_id: str | None = None,
source_lineage: list[LineageAtom] | None = None,
source_timestamp: datetime | None = None,
):
return DirectFsAccess(
path=self.path,
is_read=self.is_read,
is_write=self.is_write,
source_id=source_id or self.source_id,
source_timestamp=source_timestamp or self.source_timestamp,
source_lineage=source_lineage or self.source_lineage,
job_id=self.job_id,
job_name=self.job_name,
task_key=self.task_key,
assessment_start_timestamp=self.assessment_start_timestamp,
assessment_end_timestamp=self.assessment_start_timestamp,
)

def replace_job_infos(
self,
job_id: int | None = None,
job_name: str | None = None,
task_key: str | None = None,
):
return DirectFsAccess(
path=self.path,
is_read=self.is_read,
is_write=self.is_write,
source_id=self.source_id,
source_timestamp=self.source_timestamp,
source_lineage=self.source_lineage,
job_id=job_id or self.job_id,
job_name=job_name or self.job_name,
task_key=task_key or self.task_key,
assessment_start_timestamp=self.assessment_start_timestamp,
assessment_end_timestamp=self.assessment_start_timestamp,
)

def replace_assessment_infos(
self, assessment_start: datetime | None = None, assessment_end: datetime | None = None
):
return DirectFsAccess(
path=self.path,
is_read=self.is_read,
is_write=self.is_write,
source_id=self.source_id,
source_timestamp=self.source_timestamp,
source_lineage=self.source_lineage,
job_id=self.job_id,
job_name=self.job_name,
task_key=self.task_key,
assessment_start_timestamp=assessment_start or self.assessment_start_timestamp,
assessment_end_timestamp=assessment_end or self.assessment_start_timestamp,
)


class _DirectFsAccessCrawler(CrawlerBase[DirectFsAccess]):

def __init__(self, backend: SqlBackend, schema: str, table: str):
"""
Initializes a DFSACrawler instance.
Args:
sql_backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark)
schema: The schema name for the inventory persistence.
"""
super().__init__(backend, "hive_metastore", schema, table, DirectFsAccess)

def dump_all(self, dfsas: Sequence[DirectFsAccess]):
"""This crawler doesn't follow the pull model because the fetcher fetches data for 2 crawlers, not just one
It's not **bad** because all records are pushed at once.
Providing a multi-entity crawler is out-of-scope of this PR
"""
try:
# TODO until we historize data, we append all DFSAs
self._update_snapshot(dfsas, mode="append")
except DatabricksError as e:
logger.error("Failed to store DFSAs", exc_info=e)

def _try_fetch(self) -> Iterable[DirectFsAccess]:
sql = f"SELECT * FROM {self.full_name}"
yield from self._backend.fetch(sql)

def _crawl(self) -> Iterable[DirectFsAccess]:
raise NotImplementedError()


class DirectFsAccessCrawlers:

def __init__(self, sql_backend: SqlBackend, schema: str):
self._sql_backend = sql_backend
self._schema = schema

def for_paths(self) -> _DirectFsAccessCrawler:
return _DirectFsAccessCrawler(self._sql_backend, self._schema, "directfs_in_paths")

def for_queries(self) -> _DirectFsAccessCrawler:
return _DirectFsAccessCrawler(self._sql_backend, self._schema, "directfs_in_queries")
16 changes: 15 additions & 1 deletion src/databricks/labs/ucx/source_code/graph.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import abc
import itertools
import logging
from dataclasses import dataclass
from pathlib import Path
Expand All @@ -11,6 +12,7 @@
NodeNG,
)
from databricks.labs.ucx.source_code.base import Advisory, CurrentSessionState, is_a_notebook
from databricks.labs.ucx.source_code.directfs_access import LineageAtom
from databricks.labs.ucx.source_code.python.python_ast import Tree
from databricks.labs.ucx.source_code.path_lookup import PathLookup

Expand Down Expand Up @@ -304,7 +306,7 @@ class DependencyGraphContext:
session_state: CurrentSessionState


class Dependency(abc.ABC):
class Dependency:

def __init__(self, loader: DependencyLoader, path: Path, inherits_context=True):
self._loader = loader
Expand All @@ -331,6 +333,10 @@ def load(self, path_lookup: PathLookup) -> SourceContainer | None:
def __repr__(self):
return f"Dependency<{self.path}>"

@property
def lineage(self) -> list[LineageAtom]:
return [LineageAtom("path", str(self.path))]


class SourceContainer(abc.ABC):

Expand Down Expand Up @@ -608,6 +614,7 @@ def __init__(self, graph: DependencyGraph, walked_paths: set[Path], path_lookup:
self._graph = graph
self._walked_paths = walked_paths
self._path_lookup = path_lookup
self._lineage: list[Dependency] = []

def __iter__(self) -> Iterator[T]:
for dependency in self._graph.root_dependencies:
Expand All @@ -619,6 +626,7 @@ def __iter__(self) -> Iterator[T]:
def _iter_one(self, dependency: Dependency, graph: DependencyGraph, root_path: Path) -> Iterable[T]:
if dependency.path in self._walked_paths:
return
self._lineage.append(dependency)
self._walked_paths.add(dependency.path)
self._log_walk_one(dependency)
if dependency.path.is_file() or is_a_notebook(dependency.path):
Expand All @@ -631,6 +639,7 @@ def _iter_one(self, dependency: Dependency, graph: DependencyGraph, root_path: P
child_graph = maybe_graph.graph
for child_dependency in child_graph.local_dependencies:
yield from self._iter_one(child_dependency, child_graph, root_path)
self._lineage.pop()

def _log_walk_one(self, dependency: Dependency):
logger.debug(f'Analyzing dependency: {dependency}')
Expand All @@ -639,3 +648,8 @@ def _log_walk_one(self, dependency: Dependency):
def _process_dependency(
self, dependency: Dependency, path_lookup: PathLookup, inherited_tree: Tree | None
) -> Iterable[T]: ...

@property
def lineage(self) -> list[LineageAtom]:
lists: list[list[LineageAtom]] = [dependency.lineage for dependency in self._lineage]
return list(itertools.chain(*lists))
Loading

0 comments on commit 60e77e0

Please sign in to comment.