Skip to content

Commit

Permalink
Merge pull request #5 from JenspederM/fix/init
Browse files Browse the repository at this point in the history
skip initialization if config already exists
  • Loading branch information
JenspederM authored Jul 11, 2024
2 parents 5fd0b59 + 0c31098 commit a7e75d0
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 59 deletions.
40 changes: 40 additions & 0 deletions scripts/mkdev.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env bash

CUR_PATH=$(pwd)
VERSION=$(grep -m 1 version pyproject.toml | tr -s ' ' | tr -d '"' | tr -d "'" | cut -d' ' -f3)
WHL=kedro_databricks-$VERSION-py3-none-any.whl

if [ -z "$1" ]; then
echo "Usage: $0 <project_name>"
exit 1
fi

if test -d "$CUR_PATH/$1"; then
echo "Directory $1 already exists. Removing it."
rm -rf "$CUR_PATH/$1"
fi

# Build package
rye build

# Create a new project
kedro new --starter=databricks-iris --name="$1"

# Databricks needs Java
echo "java openjdk-21" >> "$CUR_PATH/$1/.tool-versions"

# Copy the package to the project directory
cp "dist/$WHL" "$CUR_PATH/$1/$WHL"

# Move to the project directory
pushd "$CUR_PATH/$1"
# Create a virtual environment
python3 -m venv .venv
# Activate the virtual environment
source .venv/bin/activate
# Install the project dependencies
pip install --upgrade pip
pip install -r requirements.txt
pip install $WHL
code .
popd
1 change: 0 additions & 1 deletion src/kedro_databricks/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
LOGGING_NAME = "kedro_databricks"
10 changes: 5 additions & 5 deletions src/kedro_databricks/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from typing import Any

from kedro.framework.project import PACKAGE_NAME
from kedro.framework.startup import ProjectMetadata
from kedro.pipeline import Pipeline, node

from kedro_databricks import LOGGING_NAME
from kedro_databricks.utils import (
TASK_KEY_ORDER,
WORKFLOW_KEY_ORDER,
Expand All @@ -15,8 +15,6 @@

DEFAULT = "default"

log = logging.getLogger(LOGGING_NAME)


def _create_task(name: str, depends_on: list[node]) -> dict[str, Any]:
"""Create a Databricks task for a given node.
Expand Down Expand Up @@ -225,7 +223,7 @@ def apply_resource_overrides(


def generate_resources(
pipelines: dict[str, Pipeline], package_name=PACKAGE_NAME
pipelines: dict[str, Pipeline], metadata: ProjectMetadata
) -> dict[str, dict[str, Any]]:
"""Generate Databricks resources for the given pipelines.
Expand All @@ -238,13 +236,15 @@ def generate_resources(
Returns:
dict[str, dict[str, Any]]: A dictionary of pipeline names and their Databricks resources
"""
log = logging.getLogger(metadata.package_name)

package = metadata.package_name
workflows = {}
for name, pipeline in pipelines.items():
if len(pipeline.nodes) == 0:
continue

wf_name = f"{package_name}_{name}" if name != "__default__" else package_name
wf_name = f"{package}_{name}" if name != "__default__" else package
wf = _create_workflow(wf_name, pipeline)
log.debug(f"Workflow '{wf_name}' created successfully.")
log.debug(wf)
Expand Down
42 changes: 26 additions & 16 deletions src/kedro_databricks/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,24 @@

from kedro.framework.startup import ProjectMetadata

from kedro_databricks import LOGGING_NAME

log = logging.getLogger(LOGGING_NAME)


def deploy_to_databricks(
metadata: ProjectMetadata,
env: str,
bundle: bool = True,
debug: bool = False,
):
log = logging.getLogger(metadata.package_name)
if shutil.which("databricks") is None: # pragma: no cover
raise Exception("databricks CLI is not installed")

project_path = _go_to_project(metadata.project_path)
_validate_databricks_config(project_path)
_build_project(metadata)
if bundle is True:
_bundle_project(env)
_upload_project_config(metadata.package_name, project_path)
_upload_project_data(metadata.package_name, project_path)
_bundle_project(metadata, env)
_upload_project_config(metadata)
_upload_project_data(metadata)
deploy_cmd = ["databricks", "bundle", "deploy", "--target", env]
result = subprocess.run(deploy_cmd, check=True, capture_output=True)
if result.returncode != 0:
Expand All @@ -50,37 +48,47 @@ def _validate_databricks_config(project_path):
return True


def _upload_project_data(package_name, project_path): # pragma: no cover
def _upload_project_data(metadata: ProjectMetadata): # pragma: no cover
log = logging.getLogger(metadata.package_name)
log.info("Upload project data to Databricks...")
data_path = project_path / "data"
data_path = metadata.project_path / "data"
if data_path.exists():
copy_data_cmd = [
"databricks",
"fs",
"cp",
"-r",
str(data_path),
f"dbfs:/FileStore/{package_name}/data",
f"dbfs:/FileStore/{metadata.package_name}/data",
]
result = subprocess.run(copy_data_cmd, check=False, capture_output=True)
if result.returncode != 0:
raise Exception(f"Failed to copy data to Databricks: {result.stderr}")


def _upload_project_config(package_name, project_path): # pragma: no cover
def _upload_project_config(metadata: ProjectMetadata): # pragma: no cover
log = logging.getLogger(metadata.package_name)
log.info("Upload project configuration to Databricks...")
with tarfile.open(project_path / f"dist/conf-{package_name}.tar.gz") as f:
with tarfile.open(
metadata.project_path / f"dist/conf-{metadata.package_name}.tar.gz"
) as f:
f.extractall("dist/")

try:
remove_cmd = ["databricks", "fs", "rm", "-r", f"dbfs:/FileStore/{package_name}"]
remove_cmd = [
"databricks",
"fs",
"rm",
"-r",
f"dbfs:/FileStore/{metadata.package_name}",
]
result = subprocess.run(remove_cmd, check=False)
if result.returncode != 0:
log.warning(f"Failed to remove existing project: {result.stderr}")
except Exception as e:
log.warning(f"Failed to remove existing project: {e}")

conf_path = project_path / "dist" / "conf"
conf_path = metadata.project_path / "dist" / "conf"
if not conf_path.exists():
raise FileNotFoundError(f"Configuration path {conf_path} does not exist")

Expand All @@ -90,14 +98,15 @@ def _upload_project_config(package_name, project_path): # pragma: no cover
"cp",
"-r",
str(conf_path),
f"dbfs:/FileStore/{package_name}/conf",
f"dbfs:/FileStore/{metadata.package_name}/conf",
]
result = subprocess.run(copy_conf_cmd, check=False, capture_output=True)
if result.returncode != 0:
raise Exception(f"Failed to copy configuration to Databricks: {result.stderr}")


def _bundle_project(env):
def _bundle_project(metadata: ProjectMetadata, env):
log = logging.getLogger(metadata.package_name)
log.info("Bundling the project...")
bundle_cmd = ["kedro", "databricks", "bundle", "--env", env]
result = subprocess.run(bundle_cmd, capture_output=True, check=True)
Expand All @@ -106,6 +115,7 @@ def _bundle_project(env):


def _build_project(metadata: ProjectMetadata):
log = logging.getLogger(metadata.package_name)
log.info("Building the project...")
_go_to_project(metadata.project_path)
build_cmd = ["kedro", "package"]
Expand Down
31 changes: 21 additions & 10 deletions src/kedro_databricks/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@
import tempfile
from pathlib import Path

from kedro_databricks import LOGGING_NAME

log = logging.getLogger(LOGGING_NAME)
from kedro.framework.startup import ProjectMetadata

_bundle_config_template = """
# This is a Databricks asset bundle definition for dab.
Expand Down Expand Up @@ -85,7 +83,7 @@
}

_bundle_override_template = """
# Files named ´databricks*´ or `databricks/**` will be used to apply overrides to the
# Files named `databricks*` or `databricks/**` will be used to apply overrides to the
# generated asset bundle resources. The overrides should be specified according to the
# Databricks REST API's `Create a new job` endpoint. To learn more, visit their
# documentation at https://docs.databricks.com/api/workspace/jobs/create
Expand All @@ -105,11 +103,16 @@
"""


def create_databricks_config(path: str, package_name: str):
def write_bundle_template(metadata: ProjectMetadata):
log = logging.getLogger(metadata.package_name)
log.info("Creating Databricks asset bundle configuration...")
if shutil.which("databricks") is None: # pragma: no cover
raise Exception("databricks CLI is not installed")

config = {"project_name": package_name, "project_slug": package_name}
config = {
"project_name": metadata.package_name,
"project_slug": metadata.package_name,
}

assets_dir = tempfile.mkdtemp()
assets_dir = Path(assets_dir)
Expand All @@ -125,6 +128,12 @@ def create_databricks_config(path: str, package_name: str):
template_params.write(json.dumps(config).encode())
template_params.close()

config_path = metadata.project_path / "databricks.yml"
if config_path.exists():
raise FileExistsError(
f"{config_path} already exists. To reinitialize, delete the file and try again."
)

# We utilize the databricks CLI to create the bundle configuration.
# This is a bit hacky, but it allows the plugin to tap into the authentication
# mechanism of the databricks CLI and thereby avoid the need to store credentials
Expand All @@ -138,7 +147,7 @@ def create_databricks_config(path: str, package_name: str):
"--config-file",
template_params.name,
"--output-dir",
path,
metadata.project_path.as_posix(),
],
stdout=subprocess.PIPE,
check=False,
Expand All @@ -152,12 +161,14 @@ def create_databricks_config(path: str, package_name: str):
shutil.rmtree(assets_dir)


def write_default_config(path: str, default_key: str, package_name: str):
p = Path(path)
def write_override_template(metadata: ProjectMetadata, default_key: str):
log = logging.getLogger(metadata.package_name)
log.info("Creating Databricks asset bundle override configuration...")
p = Path(metadata.project_path) / "conf" / "base" / "databricks.yml"
if not p.exists():
with open(p, "w") as f:
f.write(
_bundle_override_template.format(
default_key="default", package_name="package_name"
default_key=default_key, package_name="package_name"
)
)
31 changes: 15 additions & 16 deletions src/kedro_databricks/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@
from kedro.framework.session import KedroSession
from kedro.framework.startup import ProjectMetadata

from kedro_databricks import LOGGING_NAME
from kedro_databricks.bundle import apply_resource_overrides, generate_resources
from kedro_databricks.deploy import deploy_to_databricks
from kedro_databricks.init import create_databricks_config, write_default_config
from kedro_databricks.init import write_bundle_template, write_override_template

DEFAULT_RUN_ENV = "dev"
DEFAULT_CONFIG_KEY = "default"
Expand All @@ -32,7 +31,7 @@ def databricks_commands():


def _load_config(context: KedroContext) -> dict[str, Any]:
log = logging.getLogger(LOGGING_NAME)
log = logging.getLogger(context._package_name)
# Backwards compatibility for ConfigLoader that does not support `config_patterns`
config_loader = context.config_loader
if not hasattr(config_loader, "config_patterns"):
Expand Down Expand Up @@ -62,15 +61,8 @@ def init(
default: str,
):
"""Initialize Databricks Asset Bundle configuration"""

# Load context to initialize logging
with KedroSession.create(project_path=metadata.project_path) as session:
session.load_context()

path = metadata.project_path
conf_path = path / "conf" / "base" / "databricks.yml"
create_databricks_config(path, metadata.package_name)
write_default_config(conf_path, default, metadata.package_name)
write_bundle_template(metadata)
write_override_template(metadata, default)


@databricks_commands.command()
Expand All @@ -85,8 +77,8 @@ def bundle(
overwrite: bool,
):
"""Convert kedro pipelines into Databricks asset bundle resources"""
log = logging.getLogger(LOGGING_NAME)
pipeline_resources = generate_resources(pipelines, metadata.package_name)
log = logging.getLogger(metadata.package_name)
pipeline_resources = generate_resources(pipelines, metadata)

# If the configuration directory does not exist, Kedro will not load any configuration
conf_dir = metadata.project_path / "conf" / env
Expand Down Expand Up @@ -125,16 +117,23 @@ def bundle(

@databricks_commands.command()
@click.option("-e", "--env", default=DEFAULT_RUN_ENV, help=ENV_HELP)
@click.option("-b", "--bundle/--no-bundle", default=False, help=ENV_HELP)
@click.option(
"-b",
"--bundle/--no-bundle",
default=False,
help="Bundle the project before deploying",
)
@click.option("-b", "--debug/--no-debug", default=False, help="Enable debug mode")
@click.pass_obj
def deploy(
metadata: ProjectMetadata,
env: str,
bundle: bool,
debug: bool,
):
"""Deploy the asset bundle to Databricks"""
# Load context to initialize logging
deploy_to_databricks(metadata, env, bundle)
deploy_to_databricks(metadata, env, bundle, debug=debug)


@databricks_commands.command()
Expand Down
4 changes: 0 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,14 @@

from __future__ import annotations

import logging
import os
from pathlib import Path

from click.testing import CliRunner
from kedro.framework.cli.starters import create_cli as kedro_cli
from kedro.framework.startup import bootstrap_project
from kedro_databricks import LOGGING_NAME
from pytest import fixture

log = logging.getLogger(LOGGING_NAME)


@fixture(name="cli_runner", scope="session")
def cli_runner():
Expand Down
Loading

0 comments on commit a7e75d0

Please sign in to comment.