-
Notifications
You must be signed in to change notification settings - Fork 87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added storage for direct filesystem references in code #2526
Changes from 95 commits
18950f1
224796d
b63cb19
6d911ca
5f07dc8
a8f3ecd
f0de866
505402f
992ffe7
830b026
a1e15e7
eb56636
b8e6b82
472c417
7345d67
3e256be
d1d251f
13ea1e6
93d194f
f4bc0b8
7ae9adc
fdf7a39
054f847
9c4a5bf
6889852
7792199
b4ba5ae
2646acc
361aa2e
a5416c0
c94ee81
4a30a79
bba91c9
a2f66c4
2da934e
353871f
e1329a6
5c444ff
4d14367
7f93d10
6258908
7192e85
46cd358
2652e00
d4be072
0f3941a
e552e3e
e30ccfa
2fd1d49
4c48951
6e07c9b
cd3b115
c3ee620
26096bd
8fea3eb
8ee8a77
64de1e0
de0c4f7
6ce8f86
5b980df
cdcc3e1
1515342
ca058ed
1c88c97
ef261a6
d951286
ea74ba6
e75e07d
e62fd18
b58f47d
5a5d4ff
404f6cd
cb6e45d
733decc
6338af6
166e34c
8bc3c39
0249310
b605b77
7f9fa06
f7108cb
abcab87
eb603e4
837fe6e
e80b6f0
f8cdc22
9d4bc86
697a85e
8cb4ac0
185d060
b09a549
1742415
c5987bb
b6fbeb4
cc194b6
a5ada24
4e45840
2ce8e42
f133b97
145fbae
810e356
64b8a84
9f24f97
b93b565
3f846a8
67f9d68
506a761
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
SELECT | ||
* | ||
FROM direct_file_system_access_in_paths | ||
UNION | ||
SELECT | ||
* | ||
FROM direct_file_system_access_in_queries |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
from __future__ import annotations | ||
|
||
|
||
import logging | ||
from collections.abc import Sequence, Iterable | ||
from dataclasses import dataclass, field | ||
from datetime import datetime | ||
|
||
from databricks.labs.ucx.framework.crawlers import CrawlerBase, Result | ||
from databricks.labs.lsql.backends import SqlBackend | ||
from databricks.sdk.errors import DatabricksError | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@dataclass | ||
class LineageAtom: | ||
|
||
object_type: str | ||
object_id: str | ||
other: dict[str, str] | None = None | ||
|
||
|
||
@dataclass | ||
class DirectFsAccess: | ||
"""A record describing a Direct File System Access""" | ||
|
||
UNKNOWN = "unknown" | ||
|
||
path: str | ||
is_read: bool | ||
is_write: bool | ||
source_id: str = UNKNOWN | ||
source_timestamp: datetime = datetime.fromtimestamp(0) | ||
source_lineage: list[LineageAtom] = field(default_factory=list) | ||
job_id: int = -1 | ||
job_name: str = UNKNOWN | ||
task_key: str = UNKNOWN | ||
assessment_start_timestamp: datetime = datetime.fromtimestamp(0) | ||
assessment_end_timestamp: datetime = datetime.fromtimestamp(0) | ||
asnare marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def replace_source( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in a follow-up PR, please use |
||
self, | ||
source_id: str | None = None, | ||
source_lineage: list[LineageAtom] | None = None, | ||
source_timestamp: datetime | None = None, | ||
): | ||
return DirectFsAccess( | ||
path=self.path, | ||
is_read=self.is_read, | ||
is_write=self.is_write, | ||
source_id=source_id or self.source_id, | ||
source_timestamp=source_timestamp or self.source_timestamp, | ||
source_lineage=source_lineage or self.source_lineage, | ||
job_id=self.job_id, | ||
job_name=self.job_name, | ||
task_key=self.task_key, | ||
assessment_start_timestamp=self.assessment_start_timestamp, | ||
assessment_end_timestamp=self.assessment_start_timestamp, | ||
) | ||
|
||
def replace_job_infos( | ||
self, | ||
job_id: int | None = None, | ||
job_name: str | None = None, | ||
task_key: str | None = None, | ||
): | ||
return DirectFsAccess( | ||
path=self.path, | ||
is_read=self.is_read, | ||
is_write=self.is_write, | ||
source_id=self.source_id, | ||
source_timestamp=self.source_timestamp, | ||
source_lineage=self.source_lineage, | ||
job_id=job_id or self.job_id, | ||
job_name=job_name or self.job_name, | ||
task_key=task_key or self.task_key, | ||
assessment_start_timestamp=self.assessment_start_timestamp, | ||
assessment_end_timestamp=self.assessment_start_timestamp, | ||
) | ||
|
||
def replace_assessment_infos( | ||
self, assessment_start: datetime | None = None, assessment_end: datetime | None = None | ||
): | ||
return DirectFsAccess( | ||
path=self.path, | ||
is_read=self.is_read, | ||
is_write=self.is_write, | ||
source_id=self.source_id, | ||
source_timestamp=self.source_timestamp, | ||
source_lineage=self.source_lineage, | ||
job_id=self.job_id, | ||
job_name=self.job_name, | ||
task_key=self.task_key, | ||
assessment_start_timestamp=assessment_start or self.assessment_start_timestamp, | ||
assessment_end_timestamp=assessment_end or self.assessment_start_timestamp, | ||
) | ||
|
||
|
||
class _DirectFsAccessCrawler(CrawlerBase): | ||
asnare marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def __init__(self, backend: SqlBackend, schema: str, table: str): | ||
""" | ||
Initializes a DFSACrawler instance. | ||
|
||
Args: | ||
sql_backend (SqlBackend): The SQL Execution Backend abstraction (either REST API or Spark) | ||
schema: The schema name for the inventory persistence. | ||
""" | ||
super().__init__(backend, "hive_metastore", schema, table, DirectFsAccess) | ||
|
||
def append(self, dfsas: Sequence[DirectFsAccess]): | ||
try: | ||
self._append_records(dfsas) | ||
except DatabricksError as e: | ||
logger.error("Failed to store DFSAs", exc_info=e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sort of (callback) interface isn't supposed to be how crawlers gather and write their records: the records are written as a single operation at the end. If you want to accumulate the records, pass a list (or builder of some sort) through for the linter to accumulate the records as it runs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's discuss this ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @asnare @nfx as discussed, this is acceptable for now because WorkflowLinter simultaneously collects JobProblems and DFSAs, thus avoiding duplicate i/o and parsing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also need a crawler that is happy with existing empty snapshots |
||
|
||
def _try_fetch(self) -> Iterable[DirectFsAccess]: | ||
sql = f"SELECT * FROM {self.full_name}" | ||
yield from self._backend.fetch(sql) | ||
|
||
def _crawl(self) -> Iterable[Result]: | ||
asnare marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return [] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is where the linting process should be invoked and start from, and it should return the records that are going to be written into the table. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That works fine if the process is dedicated, but for jobs, the process collects simultaneously JobProblem's and DirectFsAccess's... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See above comment |
||
|
||
|
||
class DirectFsAccessCrawlers: | ||
|
||
def __init__(self, sql_backend: SqlBackend, schema: str): | ||
self._sql_backend = sql_backend | ||
self._schema = schema | ||
|
||
def for_paths(self) -> _DirectFsAccessCrawler: | ||
return _DirectFsAccessCrawler(self._sql_backend, self._schema, "direct_file_system_access_in_paths") | ||
|
||
def for_queries(self) -> _DirectFsAccessCrawler: | ||
return _DirectFsAccessCrawler(self._sql_backend, self._schema, "direct_file_system_access_in_queries") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: as a good practice of views, please specify explicit column names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
… especially for
UNION
where the order of columns is important.(People often assume that schema changes which modify the column order are forward and backward compatible and don't realise
UNION
s will break. The only solution is to defensively enumerate columns explicitly when performing aUNION
.)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dropped from this PR