Skip to content

Commit

Permalink
Merge pull request #80 from JenspederM/fix/add-support-for-vars
Browse files Browse the repository at this point in the history
fix: add support for vars
  • Loading branch information
JenspederM authored Nov 28, 2024
2 parents 9dcf09c + 9662c15 commit 5a24a9d
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 68 deletions.
66 changes: 32 additions & 34 deletions src/kedro_databricks/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from kedro.framework.project import pipelines as kedro_pipelines
from kedro.framework.startup import ProjectMetadata

from kedro_databricks.utils import make_workflow_name, run_cmd
from kedro_databricks.utils import Command, make_workflow_name

_INVALID_CONFIG_MSG = """
No `databricks.yml` file found. Maybe you forgot to initialize the Databricks bundle?
Expand Down Expand Up @@ -64,11 +64,8 @@ def validate_databricks_config(self):

def create_dbfs_dir(self): # pragma: no cover
"""Create a directory in DBFS."""
run_cmd(
["databricks", "fs", "mkdirs", f"dbfs:/FileStore/{self.package_name}"],
msg=self._msg,
warn=True,
)
cmd = ["databricks", "fs", "mkdirs", f"dbfs:/FileStore/{self.package_name}"]
Command(cmd, msg=self._msg, warn=True).run()

def upload_project_data(self): # pragma: no cover
"""Upload the project data to DBFS.
Expand All @@ -86,19 +83,18 @@ def upload_project_data(self): # pragma: no cover
self.log.info(
f"{self._msg}: Uploading {source_path.relative_to(self.project_path)} to {target_path}"
)
run_cmd(
[
"databricks",
"fs",
"cp",
"-r",
"--overwrite",
source_path.as_posix(),
target_path,
],
msg=self._msg,
)
cmd = [
"databricks",
"fs",
"cp",
"-r",
"--overwrite",
source_path.as_posix(),
target_path,
]
result = Command(cmd, msg=self._msg).run()
self.log.info(f"{self._msg}: Data uploaded to {target_path}")
return result

def upload_project_config(self, conf: str): # pragma: no cover
"""Upload the project configuration to DBFS.
Expand All @@ -116,43 +112,45 @@ def upload_project_config(self, conf: str): # pragma: no cover
raise FileNotFoundError(f"Configuration path {source_path} does not exist")

self.log.info(f"{self._msg}: Uploading configuration to {target_path}")
run_cmd(
[
"databricks",
"fs",
"cp",
"-r",
"--overwrite",
source_path.as_posix(),
target_path,
],
msg=self._msg,
)
cmd = [
"databricks",
"fs",
"cp",
"-r",
"--overwrite",
source_path.as_posix(),
target_path,
]
result = Command(cmd, msg=self._msg).run()
self.log.info(f"{self._msg}: Configuration uploaded to {target_path}")
return result

def build_project(self): # pragma: no cover
"""Build the project."""
self.log.info(f"{self._msg}: Building the project")
self.go_to_project()
build_cmd = ["kedro", "package"]
result = run_cmd(build_cmd, msg=self._msg)
result = Command(build_cmd, msg=self._msg).run()
return result

def deploy_project(self, target: str, debug: bool = False):
def deploy_project(self, target: str, debug: bool = False, var: list[str] = []):
"""Deploy the project to Databricks.
Args:
target (str): Databricks target environment to deploy to.
debug (bool): Whether to enable debug mode.
variables (list[str]): List of variables to set.
"""
self.log.info(
f"{self._msg}: Running `databricks bundle deploy --target {target}`"
)
deploy_cmd = ["databricks", "bundle", "deploy", "--target", target]
_var = [_v for v in var for _v in ["--var", v]]
deploy_cmd = ["databricks", "bundle", "deploy", "--target", target, *_var]
if debug:
deploy_cmd.append("--debug")
run_cmd(deploy_cmd, msg=self._msg)
result = Command(deploy_cmd, msg=self._msg).run()
self.log_deployed_resources(only_dev=target in ["dev", "local"])
return result

def log_deployed_resources(
self, pipelines: _ProjectPipelines = kedro_pipelines, only_dev: bool = False
Expand Down
5 changes: 3 additions & 2 deletions src/kedro_databricks/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import tomlkit
from kedro.framework.startup import ProjectMetadata

from kedro_databricks.utils import has_databricks_cli, run_cmd
from kedro_databricks.utils import Command, has_databricks_cli

NODE_TYPE_MAP = {
"aws": "m5.xlarge",
Expand Down Expand Up @@ -78,7 +78,8 @@ def bundle_init(self):
self.project_path.as_posix(),
]
try:
run_cmd(init_cmd, msg=MSG, warn=True)
result = Command(init_cmd, msg=MSG, warn=True).run()
return result
except subprocess.CalledProcessError as e: # pragma: no cover
if "Asset Bundle successfully created for project" not in e.stderr.decode(
"utf-8"
Expand Down
10 changes: 9 additions & 1 deletion src/kedro_databricks/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ def bundle(
@click.option("-c", "--conf", default=DEFAULT_CONF_FOLDER, help=CONF_HELP)
@click.option("-d", "--debug/--no-debug", default=False, help="Enable debug mode")
@click.option("-p", "--pipeline", default=None, help="Bundle a single pipeline")
@click.option(
"-v",
"--var",
default=[],
help='set values for variables defined in bundle config. Example: --var="foo=bar"',
multiple=True,
)
@click.pass_obj
def deploy(
metadata: ProjectMetadata,
Expand All @@ -126,6 +133,7 @@ def deploy(
conf: str,
pipeline: str,
debug: bool,
var: list[str],
):
"""Deploy the asset bundle to Databricks"""

Expand All @@ -145,4 +153,4 @@ def deploy(
controller.upload_project_data()
if target is None:
target = env
controller.deploy_project(target=target, debug=debug)
controller.deploy_project(target=target, debug=debug, var=var)
64 changes: 42 additions & 22 deletions src/kedro_databricks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ def get_entry_point(project_name: str) -> str:
"""
entrypoint = project_name.strip().lower()
entrypoint = re.sub(r" +", " ", entrypoint)
return re.sub(r"[^a-zA-Z]", "-", entrypoint)
entrypoint = re.sub(r"[^a-zA-Z]", "-", entrypoint)
entrypoint = re.sub(r"(-+)$", "", entrypoint)
entrypoint = re.sub(r"^(-+)", "", entrypoint)
return entrypoint


def require_databricks_run_script() -> bool:
def require_databricks_run_script(_version=KEDRO_VERSION) -> bool:
"""Check if the current Kedro version is less than 0.19.8.
Kedro 0.19.8 introduced a new `run_script` method that is required for
Expand All @@ -61,31 +64,48 @@ def require_databricks_run_script() -> bool:
Returns:
bool: whether the current Kedro version is less than 0.19.8
"""
return KEDRO_VERSION < [0, 19, 8]
return _version < [0, 19, 8]


def run_cmd(
cmd: list[str], msg: str = "Failed to run command", warn: bool = False
) -> subprocess.CompletedProcess | None:
"""Run a shell command.
class Command:
def __init__(
self, command: list[str], warn: bool = False, msg: str = "Error when running"
):
self.log = logging.getLogger(self.__class__.__name__)
self.command = command
self.warn = warn
self.msg = msg

Args:
cmds (List[str]): list of commands to run
msg (str, optional): message to raise if the command fails
warn (bool): whether to log a warning if the command fails
"""
def __str__(self):
return f"Command({self.command})"

def __repr__(self):
return self.__str__()

def __rich_repr__(self): # pragma: no cover
yield "program", self.command[0]
yield "args", self.command[1:]

try:
result = subprocess.run(cmd, stdout=subprocess.PIPE, check=True)
for line in result.stdout.decode().split("\n"):
logging.info(line)
def run(self, *args):
cmd = self.command + list(*args)
result = subprocess.run(cmd, check=False, capture_output=True)
if result.returncode != 0:
error_msg = self._get_error_message(result)

if self.warn:
self.log.warning(f"{self.msg}: {self.command}\n{error_msg}")
return result

raise RuntimeError(f"{self.msg}: {self.command}\n{error_msg}")
return result
except Exception as e:
if warn:
logging.warning(f"{msg}: {e}")
return None
else:
raise Exception(f"{msg}: {e}")

def _get_error_message(self, result): # pragma: no cover
error_msg = result.stderr.decode("utf-8").strip()
if not error_msg:
error_msg = result.stdout.decode("utf-8").strip()
if not error_msg:
error_msg = f"Command failed with return code {result.returncode}"
return error_msg


def make_workflow_name(package_name, pipeline_name: str) -> str:
Expand Down
Loading

0 comments on commit 5a24a9d

Please sign in to comment.