-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added storage for direct filesystem references in code #2526
Merged
+627
−119
Merged
Changes from all commits
Commits
Show all changes
107 commits
Select commit
Hold shift + click to select a range
18950f1
add support for sql functional tests
ericvergnaud 224796d
disable
ericvergnaud b63cb19
more functional tests
ericvergnaud 6d911ca
move test to functional
ericvergnaud 5f07dc8
formatting
ericvergnaud a8f3ecd
formatting
ericvergnaud f0de866
Merge branch 'main' into support-sql-notebooks-in-functional-tests
ericvergnaud 505402f
enhance dbfs linting to all direct file system access
ericvergnaud 992ffe7
use dfsa for pyspark
ericvergnaud 830b026
fix duplicate advice
ericvergnaud a1e15e7
fix functional tests
ericvergnaud eb56636
formatting
ericvergnaud b8e6b82
move python parsing code under dedicated package
ericvergnaud 472c417
move PythnCodeAnalyzer to dedicated file
ericvergnaud 7345d67
Merge branch 'refactor-python-code-analysis' into lint-direct-file-sy…
ericvergnaud 3e256be
fix merge issues
ericvergnaud d1d251f
merge from stale branch
ericvergnaud 13ea1e6
more tests
ericvergnaud 93d194f
merge from stale branch
ericvergnaud f4bc0b8
merge from stale branch
ericvergnaud 7ae9adc
fix failing tests
ericvergnaud fdf7a39
rename ctor arg
ericvergnaud 054f847
fix infinite recursion with unknown ASTs
ericvergnaud 9c4a5bf
fix infinite recursion with unknown ASTs
ericvergnaud 6889852
make register_magic_command a decorator
ericvergnaud 7792199
formatting
ericvergnaud b4ba5ae
integrate with WorkflowLinter
ericvergnaud 2646acc
fix failing tests
ericvergnaud 361aa2e
Merge branch 'lint-direct-file-system-access' into store-dfsa-records
ericvergnaud a5416c0
finalize integration
ericvergnaud c94ee81
Merge branch 'refactor-python-code-analysis' into lint-direct-file-sy…
ericvergnaud 4a30a79
Merge branch 'lint-direct-file-system-access' into store-dfsa-records
ericvergnaud bba91c9
add logs
ericvergnaud a2f66c4
Merge branch 'main' into refactor-python-code-analysis
ericvergnaud 2da934e
enhance integration test or checking stored DFSAs
ericvergnaud 353871f
Merge branch 'refactor-python-code-analysis' into lint-direct-file-sy…
ericvergnaud e1329a6
Merge branch 'lint-direct-file-system-access' into store-dfsa-records
ericvergnaud 5c444ff
move 'magic'-related stuff to dedicated file
ericvergnaud 4d14367
Merge branch 'main' into refactor-python-code-analysis
ericvergnaud 7f93d10
formatting
ericvergnaud 6258908
Merge branch 'refactor-python-code-analysis' into store-dfsa-records
ericvergnaud 7192e85
fix failing tests
ericvergnaud 46cd358
Merge branch 'refactor-python-code-analysis' into lint-direct-file-sy…
ericvergnaud 2652e00
Merge branch 'lint-direct-file-system-access' into store-dfsa-records
ericvergnaud d4be072
formatting
ericvergnaud 0f3941a
Merge branch 'main' into refactor-python-code-analysis
ericvergnaud e552e3e
rename dfsa -> directfs
ericvergnaud e30ccfa
improve naming and drop /tmp/ exclusion
ericvergnaud 2fd1d49
Merge branch 'main' into lint-direct-file-system-access
ericvergnaud 4c48951
Update docs
ericvergnaud 6e07c9b
move to functional test
ericvergnaud cd3b115
update docs
ericvergnaud c3ee620
Merge branch 'refactor-python-code-analysis' into lint-direct-file-sy…
ericvergnaud 26096bd
Merge branch 'lint-direct-file-system-access' into store-dfsa-records
ericvergnaud 8fea3eb
improve naming and comments
ericvergnaud 8ee8a77
Merge branch 'lint-direct-file-system-access' into store-dfsa-records
ericvergnaud 64de1e0
fix failing test
ericvergnaud de0c4f7
Merge branch 'main' into store-dfsa-records
ericvergnaud 6ce8f86
Merge branch 'main' into store-dfsa-records
ericvergnaud 5b980df
store DFSAs for paths and queries in dedicated tables
ericvergnaud cdcc3e1
support lineage when walking dependency graph
ericvergnaud 1515342
sore dfsa lineage
ericvergnaud ca058ed
Merge branch 'main' into store-dfsa-records
ericvergnaud 1c88c97
fix merge issues
ericvergnaud ef261a6
capture and store source_timestamp
ericvergnaud d951286
simplify
ericvergnaud ea74ba6
capture and store job/task infos
ericvergnaud e75e07d
simplify
ericvergnaud e62fd18
capture and store assessment start/stop, also drop source_type
ericvergnaud b58f47d
drop mock_dfsa_crawlers
ericvergnaud 5a5d4ff
rename _backend -> _sql_backend
ericvergnaud 404f6cd
rename _backend -> _sql_backend
ericvergnaud cb6e45d
hdfs is irrelevant, replace with dbfs
ericvergnaud 733decc
drop mock of DirectFsAccessCrawlers
ericvergnaud 6338af6
gather and store dfsas from refresh_report
ericvergnaud 166e34c
prevent pylint warning
ericvergnaud 8bc3c39
Merge branch 'main' into store-dfsa-records
ericvergnaud 0249310
fix failing tests
ericvergnaud b605b77
formatting
ericvergnaud 7f9fa06
fix failing tests
ericvergnaud f7108cb
fix failing tests
ericvergnaud abcab87
fix failing tests
ericvergnaud eb603e4
catch infinite recursion
ericvergnaud 837fe6e
drop legacy code
ericvergnaud e80b6f0
Revert "catch infinite recursion"
ericvergnaud f8cdc22
Use structured lineage for DependencyGraph (#2556)
ericvergnaud 9d4bc86
Merge branch 'main' into store-dfsa-records
ericvergnaud 697a85e
Merge branch 'main' into store-dfsa-records
ericvergnaud 8cb4ac0
fix merge issues
ericvergnaud 185d060
Update src/databricks/labs/ucx/source_code/base.py
ericvergnaud b09a549
Merge branch 'main' into store-dfsa-records
ericvergnaud 1742415
refactor DirectFsAccess
ericvergnaud c5987bb
add view
ericvergnaud b6fbeb4
fix failing tests
ericvergnaud cc194b6
formatting
ericvergnaud a5ada24
install added table
ericvergnaud 4e45840
Merge branch 'main' into store-dfsa-records
ericvergnaud 2ce8e42
address verbal comments from @asnare
ericvergnaud f133b97
Merge branch 'main' into store-dfsa-records
ericvergnaud 145fbae
rename table and drop unused view
ericvergnaud 810e356
rename method that is not yet in line with new crawler design
ericvergnaud 64b8a84
Merge branch 'main' into store-dfsa-records
ericvergnaud 9f24f97
rename method that is not yet in line with new crawler design
ericvergnaud b93b565
Update src/databricks/labs/ucx/source_code/jobs.py
ericvergnaud 3f846a8
document design decision
ericvergnaud 67f9d68
simplify
ericvergnaud 506a761
Merge branch 'store-dfsa-records' of github.com:databrickslabs/ucx in…
ericvergnaud File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -75,6 +75,7 @@ | |
from databricks.labs.ucx.installer.workflows import WorkflowsDeployment | ||
from databricks.labs.ucx.recon.migration_recon import ReconResult | ||
from databricks.labs.ucx.runtime import Workflows | ||
from databricks.labs.ucx.source_code.directfs_access import DirectFsAccess | ||
from databricks.labs.ucx.source_code.jobs import JobProblem | ||
from databricks.labs.ucx.workspace_access.base import Permissions | ||
from databricks.labs.ucx.workspace_access.generic import WorkspaceObjectInfo | ||
|
@@ -120,6 +121,9 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str): | |
functools.partial(table, "udfs", Udf), | ||
functools.partial(table, "logs", LogRecord), | ||
functools.partial(table, "recon_results", ReconResult), | ||
functools.partial( | ||
table, "directfs_in_paths", DirectFsAccess | ||
), # directfs_in_queries will be added in upcoming PR | ||
], | ||
) | ||
deployer.deploy_view("grant_detail", "queries/views/grant_detail.sql") | ||
|
@@ -128,6 +132,7 @@ def deploy_schema(sql_backend: SqlBackend, inventory_schema: str): | |
deployer.deploy_view("misc_patterns", "queries/views/misc_patterns.sql") | ||
deployer.deploy_view("code_patterns", "queries/views/code_patterns.sql") | ||
deployer.deploy_view("reconciliation_results", "queries/views/reconciliation_results.sql") | ||
# direct_file_system_access view will be added in upcoming PR | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding |
||
|
||
|
||
def extract_major_minor(version_string): | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
from __future__ import annotations | ||
|
||
|
||
import logging | ||
from collections.abc import Sequence, Iterable | ||
from dataclasses import dataclass, field | ||
from datetime import datetime | ||
|
||
from databricks.labs.ucx.framework.crawlers import CrawlerBase | ||
from databricks.labs.lsql.backends import SqlBackend | ||
from databricks.sdk.errors import DatabricksError | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@dataclass | ||
class LineageAtom: | ||
|
||
object_type: str | ||
object_id: str | ||
other: dict[str, str] | None = None | ||
|
||
|
||
@dataclass | ||
class DirectFsAccess: | ||
"""A record describing a Direct File System Access""" | ||
|
||
UNKNOWN = "unknown" | ||
|
||
path: str | ||
is_read: bool | ||
is_write: bool | ||
source_id: str = UNKNOWN | ||
source_timestamp: datetime = datetime.fromtimestamp(0) | ||
source_lineage: list[LineageAtom] = field(default_factory=list) | ||
job_id: int = -1 | ||
job_name: str = UNKNOWN | ||
task_key: str = UNKNOWN | ||
assessment_start_timestamp: datetime = datetime.fromtimestamp(0) | ||
assessment_end_timestamp: datetime = datetime.fromtimestamp(0) | ||
asnare marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def replace_source( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in a follow-up PR, please use |
||
self, | ||
source_id: str | None = None, | ||
source_lineage: list[LineageAtom] | None = None, | ||
source_timestamp: datetime | None = None, | ||
): | ||
return DirectFsAccess( | ||
path=self.path, | ||
is_read=self.is_read, | ||
is_write=self.is_write, | ||
source_id=source_id or self.source_id, | ||
source_timestamp=source_timestamp or self.source_timestamp, | ||
source_lineage=source_lineage or self.source_lineage, | ||
job_id=self.job_id, | ||
job_name=self.job_name, | ||
task_key=self.task_key, | ||
assessment_start_timestamp=self.assessment_start_timestamp, | ||
assessment_end_timestamp=self.assessment_start_timestamp, | ||
) | ||
|
||
def replace_job_infos( | ||
self, | ||
job_id: int | None = None, | ||
job_name: str | None = None, | ||
task_key: str | None = None, | ||
): | ||
return DirectFsAccess( | ||
path=self.path, | ||
is_read=self.is_read, | ||
is_write=self.is_write, | ||
source_id=self.source_id, | ||
source_timestamp=self.source_timestamp, | ||
source_lineage=self.source_lineage, | ||
job_id=job_id or self.job_id, | ||
job_name=job_name or self.job_name, | ||
task_key=task_key or self.task_key, | ||
assessment_start_timestamp=self.assessment_start_timestamp, | ||
assessment_end_timestamp=self.assessment_start_timestamp, | ||
) | ||
|
||
def replace_assessment_infos( | ||
self, assessment_start: datetime | None = None, assessment_end: datetime | None = None | ||
): | ||
return DirectFsAccess( | ||
path=self.path, | ||
is_read=self.is_read, | ||
is_write=self.is_write, | ||
source_id=self.source_id, | ||
source_timestamp=self.source_timestamp, | ||
source_lineage=self.source_lineage, | ||
job_id=self.job_id, | ||
job_name=self.job_name, | ||
task_key=self.task_key, | ||
assessment_start_timestamp=assessment_start or self.assessment_start_timestamp, | ||
assessment_end_timestamp=assessment_end or self.assessment_start_timestamp, | ||
) | ||
|
||
|
||
class _DirectFsAccessCrawler(CrawlerBase[DirectFsAccess]): | ||
|
||
def __init__(self, backend: SqlBackend, schema: str, table: str): | ||
""" | ||
Initializes a DFSACrawler instance. | ||
|
||
Args: | ||
sql_backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark) | ||
schema: The schema name for the inventory persistence. | ||
""" | ||
super().__init__(backend, "hive_metastore", schema, table, DirectFsAccess) | ||
|
||
def dump_all(self, dfsas: Sequence[DirectFsAccess]): | ||
"""This crawler doesn't follow the pull model because the fetcher fetches data for 2 crawlers, not just one | ||
It's not **bad** because all records are pushed at once. | ||
Providing a multi-entity crawler is out-of-scope of this PR | ||
""" | ||
try: | ||
# TODO until we historize data, we append all DFSAs | ||
self._update_snapshot(dfsas, mode="append") | ||
except DatabricksError as e: | ||
logger.error("Failed to store DFSAs", exc_info=e) | ||
|
||
def _try_fetch(self) -> Iterable[DirectFsAccess]: | ||
sql = f"SELECT * FROM {self.full_name}" | ||
yield from self._backend.fetch(sql) | ||
|
||
def _crawl(self) -> Iterable[DirectFsAccess]: | ||
raise NotImplementedError() | ||
|
||
|
||
class DirectFsAccessCrawlers: | ||
|
||
def __init__(self, sql_backend: SqlBackend, schema: str): | ||
self._sql_backend = sql_backend | ||
self._schema = schema | ||
|
||
def for_paths(self) -> _DirectFsAccessCrawler: | ||
return _DirectFsAccessCrawler(self._sql_backend, self._schema, "directfs_in_paths") | ||
|
||
def for_queries(self) -> _DirectFsAccessCrawler: | ||
return _DirectFsAccessCrawler(self._sql_backend, self._schema, "directfs_in_queries") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding
direct_file_system_access_in_queries
table will be done in PR #2599