Skip to content

Commit

Permalink
Merge pull request #84 from JenspederM/fix/tarfile-filter
Browse files Browse the repository at this point in the history
Fix/tarfile filter
  • Loading branch information
JenspederM authored Dec 8, 2024
2 parents 193f3a6 + fb90075 commit d34ad69
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 29 deletions.
19 changes: 14 additions & 5 deletions src/kedro_databricks/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,19 @@ def upload_project_config(self, conf: str): # pragma: no cover
Args:
conf (str): The conf folder.
"""
conf_tar = self.project_path / f"dist/conf-{self.package_name}.tar.gz"
with tarfile.open(conf_tar) as f:
f.extractall("dist/", filter="tar")

target_path = f"dbfs:/FileStore/{self.package_name}/{conf}"
source_path = self.project_path / "dist" / conf
conf_tar = self.project_path / f"dist/conf-{self.package_name}.tar.gz"

if not conf_tar.exists():
self.log.error("No files found")
raise FileNotFoundError(f"Configuration tar file {conf_tar} does not exist")

with tarfile.open(conf_tar) as tar:
file_names = tar.getnames()
for _file in file_names:
tar.extract(_file, source_path)

if not source_path.exists():
raise FileNotFoundError(f"Configuration path {source_path} does not exist")

Expand Down Expand Up @@ -179,7 +186,9 @@ def _gather_user_jobs(
for job_name, job in all_jobs.items():
is_dev = job_name.startswith("[dev")
is_valid = self._is_valid_job(pipelines, job_name)
if (is_dev and username not in job_name) or not is_valid:
if (
is_dev and username not in job_name
) or not is_valid: # pragma: no cover
continue
n = job_name.split(" - ")[0]
link = JobLink(name=n, url=f"{job_host}/{job.job_id}", is_dev=is_dev)
Expand Down
8 changes: 4 additions & 4 deletions src/kedro_databricks/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ def bundle_init(self):
]
try:
result = Command(init_cmd, msg=MSG, warn=True).run()
self.log.info(f"{MSG}: Wrote {config_path.relative_to(self.project_path)}")
shutil.rmtree(assets_dir)
return result
except subprocess.CalledProcessError as e: # pragma: no cover
if "Asset Bundle successfully created for project" not in e.stderr.decode(
"utf-8"
):
): # pragma: no cover
shutil.rmtree(assets_dir)
raise e
self.log.info(f"{MSG}: Wrote {config_path.relative_to(self.project_path)}")

shutil.rmtree(assets_dir)

def write_kedro_databricks_config(self, default_key: str, provider_name: str):
MSG = "Creating bundle override configuration"
Expand Down
57 changes: 38 additions & 19 deletions src/kedro_databricks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import re
import shutil
import subprocess
from typing import Any
from typing import IO, Any

from kedro import __version__ as kedro_version

Expand Down Expand Up @@ -88,24 +88,43 @@ def __rich_repr__(self): # pragma: no cover

def run(self, *args):
cmd = self.command + list(*args)
result = subprocess.run(cmd, check=False, capture_output=True)
if result.returncode != 0:
error_msg = self._get_error_message(result)

if self.warn:
self.log.warning(f"{self.msg}: {self.command}\n{error_msg}")
return result

raise RuntimeError(f"{self.msg}: {self.command}\n{error_msg}")
return result

def _get_error_message(self, result): # pragma: no cover
error_msg = result.stderr.decode("utf-8").strip()
if not error_msg:
error_msg = result.stdout.decode("utf-8").strip()
if not error_msg:
error_msg = f"Command failed with return code {result.returncode}"
return error_msg
self.log.info(f"Running command: {cmd}")
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as popen:
stdout = self._read(popen.stdout, self.log.info)
stderr = self._read(popen.stderr, self.log.error)
return_code = popen.wait()
if return_code != 0:
self._handle_error(stdout, stderr)

return subprocess.CompletedProcess(
args=cmd,
returncode=return_code,
stdout=stdout,
stderr=stderr or "",
)

def _read(self, io: IO, log_func: Any) -> list[str]:
lines = []
while True:
line = io.readline().decode("utf-8", errors="replace").strip()
if not line:
break
log_func(f"{self}: {line}")
lines.append(line)
return lines

def _handle_error(self, stdout: list[str], stderr: list[str]):
error_msg = "\n".join(stderr)
if not error_msg: # pragma: no cover
error_msg = "\n".join(stdout)
if self.warn:
self.log.warning(f"{self.msg} ({self.command}): {error_msg}")
else:
raise RuntimeError(f"{self.msg} ({self.command}): {error_msg}")


def make_workflow_name(package_name, pipeline_name: str) -> str:
Expand Down
72 changes: 71 additions & 1 deletion tests/integration/test_deploy.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,77 @@
from kedro.pipeline import Pipeline, node

from kedro_databricks.deploy import DeployController
from kedro_databricks.plugin import commands
from tests.utils import reset_init


def identity(arg):
return arg


pipeline = Pipeline(
[
node(
identity,
["input"],
["intermediate"],
name="node0",
tags=["tag0", "tag1"],
),
node(identity, ["intermediate"], ["output"], name="node1"),
node(identity, ["intermediate"], ["output2"], name="node2", tags=["tag0"]),
node(
identity,
["intermediate"],
["output3"],
name="node3",
tags=["tag1", "tag2"],
),
node(identity, ["intermediate"], ["output4"], name="node4", tags=["tag2"]),
],
tags="pipeline0",
)

pipelines = {
"__default__": pipeline,
"ds": pipeline,
}


def test_deploy(cli_runner, metadata):
"""Test the `deploy` command"""
reset_init(metadata)
deploy_fail = ["databricks", "deploy"]
result = cli_runner.invoke(commands, deploy_fail, obj=metadata)
assert result.exit_code == 1, (result.exit_code, result.stdout)

init_cmd = ["databricks", "init", "--provider", "1"]
result = cli_runner.invoke(commands, init_cmd, obj=metadata)
override_path = metadata.project_path / "conf" / "base" / "databricks.yml"
assert result.exit_code == 0, (result.exit_code, result.stdout)
assert metadata.project_path.exists(), "Project path not created"
assert metadata.project_path.is_dir(), "Project path is not a directory"
assert override_path.exists(), "Override file not created"

deploy_cmd = ["databricks", "deploy", "--bundle"]
result = cli_runner.invoke(commands, deploy_cmd, obj=metadata)
assert result.exit_code == 0, (result.exit_code, result.stdout)

controller = DeployController(metadata)
resources = controller.log_deployed_resources(pipelines, only_dev=True)
assert len(resources) > 0, f"There are no resources: {resources}"
assert all(
metadata.package_name in p.name for p in resources
), f"Package name not in resource: {[p.name for p in resources if metadata.package_name not in p.name]}"


def test_deploy_prod(cli_runner, metadata):
"""Test the `deploy` command"""
reset_init(metadata)
deploy_fail = ["databricks", "deploy"]
result = cli_runner.invoke(commands, deploy_fail, obj=metadata)
assert result.exit_code == 1, (result.exit_code, result.stdout)

init_cmd = ["databricks", "init"]
result = cli_runner.invoke(commands, init_cmd, obj=metadata)
override_path = metadata.project_path / "conf" / "base" / "databricks.yml"
Expand All @@ -17,10 +80,17 @@ def test_deploy(cli_runner, metadata):
assert metadata.project_path.is_dir(), "Project path is not a directory"
assert override_path.exists(), "Override file not created"

deploy_cmd = ["databricks", "deploy", "--bundle", "--debug"]
deploy_cmd = ["databricks", "deploy", "--env", "prod", "--bundle", "--debug"]
result = cli_runner.invoke(commands, deploy_cmd, obj=metadata)
assert result.exit_code == 0, (result.exit_code, result.stdout)

controller = DeployController(metadata)
resources = controller.log_deployed_resources(pipelines)
assert len(resources) > 0, f"There are no resources: {resources}"
assert all(
metadata.package_name in p.name for p in resources
), f"Package name not in resource: {[p.name for p in resources if metadata.package_name not in p.name]}"


def test_deploy_with_conf(cli_runner, metadata):
"""Test the `deploy` command"""
Expand Down

0 comments on commit d34ad69

Please sign in to comment.