diff --git a/src/kedro_databricks/deploy.py b/src/kedro_databricks/deploy.py index f5d31eb..3e66b01 100644 --- a/src/kedro_databricks/deploy.py +++ b/src/kedro_databricks/deploy.py @@ -102,12 +102,19 @@ def upload_project_config(self, conf: str): # pragma: no cover Args: conf (str): The conf folder. """ - conf_tar = self.project_path / f"dist/conf-{self.package_name}.tar.gz" - with tarfile.open(conf_tar) as f: - f.extractall("dist/", filter="tar") - target_path = f"dbfs:/FileStore/{self.package_name}/{conf}" source_path = self.project_path / "dist" / conf + conf_tar = self.project_path / f"dist/conf-{self.package_name}.tar.gz" + + if not conf_tar.exists(): + self.log.error("No files found") + raise FileNotFoundError(f"Configuration tar file {conf_tar} does not exist") + + with tarfile.open(conf_tar) as tar: + file_names = tar.getnames() + for _file in file_names: + tar.extract(_file, source_path) + if not source_path.exists(): raise FileNotFoundError(f"Configuration path {source_path} does not exist") @@ -179,7 +186,9 @@ def _gather_user_jobs( for job_name, job in all_jobs.items(): is_dev = job_name.startswith("[dev") is_valid = self._is_valid_job(pipelines, job_name) - if (is_dev and username not in job_name) or not is_valid: + if ( + is_dev and username not in job_name + ) or not is_valid: # pragma: no cover continue n = job_name.split(" - ")[0] link = JobLink(name=n, url=f"{job_host}/{job.job_id}", is_dev=is_dev) diff --git a/src/kedro_databricks/init.py b/src/kedro_databricks/init.py index 78603d5..05ba191 100644 --- a/src/kedro_databricks/init.py +++ b/src/kedro_databricks/init.py @@ -79,15 +79,15 @@ def bundle_init(self): ] try: result = Command(init_cmd, msg=MSG, warn=True).run() + self.log.info(f"{MSG}: Wrote {config_path.relative_to(self.project_path)}") + shutil.rmtree(assets_dir) return result except subprocess.CalledProcessError as e: # pragma: no cover if "Asset Bundle successfully created for project" not in e.stderr.decode( "utf-8" - ): + ): # pragma: no cover + shutil.rmtree(assets_dir) raise e - self.log.info(f"{MSG}: Wrote {config_path.relative_to(self.project_path)}") - - shutil.rmtree(assets_dir) def write_kedro_databricks_config(self, default_key: str, provider_name: str): MSG = "Creating bundle override configuration" diff --git a/src/kedro_databricks/utils.py b/src/kedro_databricks/utils.py index 95ce6c0..7b3f571 100644 --- a/src/kedro_databricks/utils.py +++ b/src/kedro_databricks/utils.py @@ -5,7 +5,7 @@ import re import shutil import subprocess -from typing import Any +from typing import IO, Any from kedro import __version__ as kedro_version @@ -88,24 +88,43 @@ def __rich_repr__(self): # pragma: no cover def run(self, *args): cmd = self.command + list(*args) - result = subprocess.run(cmd, check=False, capture_output=True) - if result.returncode != 0: - error_msg = self._get_error_message(result) - - if self.warn: - self.log.warning(f"{self.msg}: {self.command}\n{error_msg}") - return result - - raise RuntimeError(f"{self.msg}: {self.command}\n{error_msg}") - return result - - def _get_error_message(self, result): # pragma: no cover - error_msg = result.stderr.decode("utf-8").strip() - if not error_msg: - error_msg = result.stdout.decode("utf-8").strip() - if not error_msg: - error_msg = f"Command failed with return code {result.returncode}" - return error_msg + self.log.info(f"Running command: {cmd}") + with subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) as popen: + stdout = self._read(popen.stdout, self.log.info) + stderr = self._read(popen.stderr, self.log.error) + return_code = popen.wait() + if return_code != 0: + self._handle_error(stdout, stderr) + + return subprocess.CompletedProcess( + args=cmd, + returncode=return_code, + stdout=stdout, + stderr=stderr or "", + ) + + def _read(self, io: IO, log_func: Any) -> list[str]: + lines = [] + while True: + line = io.readline().decode("utf-8", errors="replace").strip() + if not line: + break + log_func(f"{self}: {line}") + lines.append(line) + return lines + + def _handle_error(self, stdout: list[str], stderr: list[str]): + error_msg = "\n".join(stderr) + if not error_msg: # pragma: no cover + error_msg = "\n".join(stdout) + if self.warn: + self.log.warning(f"{self.msg} ({self.command}): {error_msg}") + else: + raise RuntimeError(f"{self.msg} ({self.command}): {error_msg}") def make_workflow_name(package_name, pipeline_name: str) -> str: diff --git a/tests/integration/test_deploy.py b/tests/integration/test_deploy.py index 61126b6..369acc5 100644 --- a/tests/integration/test_deploy.py +++ b/tests/integration/test_deploy.py @@ -1,7 +1,43 @@ +from kedro.pipeline import Pipeline, node + +from kedro_databricks.deploy import DeployController from kedro_databricks.plugin import commands from tests.utils import reset_init +def identity(arg): + return arg + + +pipeline = Pipeline( + [ + node( + identity, + ["input"], + ["intermediate"], + name="node0", + tags=["tag0", "tag1"], + ), + node(identity, ["intermediate"], ["output"], name="node1"), + node(identity, ["intermediate"], ["output2"], name="node2", tags=["tag0"]), + node( + identity, + ["intermediate"], + ["output3"], + name="node3", + tags=["tag1", "tag2"], + ), + node(identity, ["intermediate"], ["output4"], name="node4", tags=["tag2"]), + ], + tags="pipeline0", +) + +pipelines = { + "__default__": pipeline, + "ds": pipeline, +} + + def test_deploy(cli_runner, metadata): """Test the `deploy` command""" reset_init(metadata) @@ -9,6 +45,33 @@ def test_deploy(cli_runner, metadata): result = cli_runner.invoke(commands, deploy_fail, obj=metadata) assert result.exit_code == 1, (result.exit_code, result.stdout) + init_cmd = ["databricks", "init", "--provider", "1"] + result = cli_runner.invoke(commands, init_cmd, obj=metadata) + override_path = metadata.project_path / "conf" / "base" / "databricks.yml" + assert result.exit_code == 0, (result.exit_code, result.stdout) + assert metadata.project_path.exists(), "Project path not created" + assert metadata.project_path.is_dir(), "Project path is not a directory" + assert override_path.exists(), "Override file not created" + + deploy_cmd = ["databricks", "deploy", "--bundle"] + result = cli_runner.invoke(commands, deploy_cmd, obj=metadata) + assert result.exit_code == 0, (result.exit_code, result.stdout) + + controller = DeployController(metadata) + resources = controller.log_deployed_resources(pipelines, only_dev=True) + assert len(resources) > 0, f"There are no resources: {resources}" + assert all( + metadata.package_name in p.name for p in resources + ), f"Package name not in resource: {[p.name for p in resources if metadata.package_name not in p.name]}" + + +def test_deploy_prod(cli_runner, metadata): + """Test the `deploy` command""" + reset_init(metadata) + deploy_fail = ["databricks", "deploy"] + result = cli_runner.invoke(commands, deploy_fail, obj=metadata) + assert result.exit_code == 1, (result.exit_code, result.stdout) + init_cmd = ["databricks", "init"] result = cli_runner.invoke(commands, init_cmd, obj=metadata) override_path = metadata.project_path / "conf" / "base" / "databricks.yml" @@ -17,10 +80,17 @@ def test_deploy(cli_runner, metadata): assert metadata.project_path.is_dir(), "Project path is not a directory" assert override_path.exists(), "Override file not created" - deploy_cmd = ["databricks", "deploy", "--bundle", "--debug"] + deploy_cmd = ["databricks", "deploy", "--env", "prod", "--bundle", "--debug"] result = cli_runner.invoke(commands, deploy_cmd, obj=metadata) assert result.exit_code == 0, (result.exit_code, result.stdout) + controller = DeployController(metadata) + resources = controller.log_deployed_resources(pipelines) + assert len(resources) > 0, f"There are no resources: {resources}" + assert all( + metadata.package_name in p.name for p in resources + ), f"Package name not in resource: {[p.name for p in resources if metadata.package_name not in p.name]}" + def test_deploy_with_conf(cli_runner, metadata): """Test the `deploy` command"""