From 15a23ab47f72548f508617187db717c0676537cb Mon Sep 17 00:00:00 2001 From: Michal Brys <10577534+michalbrys@users.noreply.github.com> Date: Fri, 11 Mar 2022 13:09:16 +0100 Subject: [PATCH 1/7] Update 01_quickstart.md Switched to the official `spaceflights` since the required changes in Kedro are merged. --- docs/source/03_getting_started/01_quickstart.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/source/03_getting_started/01_quickstart.md b/docs/source/03_getting_started/01_quickstart.md index 89848bb..e0d5f07 100644 --- a/docs/source/03_getting_started/01_quickstart.md +++ b/docs/source/03_getting_started/01_quickstart.md @@ -21,13 +21,14 @@ $ source venv-demo/bin/activate Then, `kedro` must be present to enable cloning the starter project, along with the latest version of `kedro-kubeflow` plugina and kedro-docker (required to build docker images with the Kedro pipeline nodes): ``` -$ pip install 'kedro<0.17' kedro-kubeflow kedro-docker +$ pip install 'kedro<0.18' kedro-kubeflow kedro-docker ``` With the dependencies in place, let's create a new project: ``` -$ kedro new --starter=git+https://github.com/getindata/kedro-starter-spaceflights.git --checkout allow_nodes_with_commas +$ kedro new --starter=spaceflights + Project Name: ============= Please enter a human readable name for your new project. @@ -53,8 +54,6 @@ Change directory to the project generated in /home/mario/kedro/kubeflow-plugin-d A best-practice setup includes initialising git and creating a virtual environment before running `kedro install` to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/ ``` -> TODO: switch to the official `spaceflights` starter after https://github.com/quantumblacklabs/kedro-starter-spaceflights/pull/10 is merged - Finally, go the demo project directory and ensure that kedro-kubeflow plugin is activated: ```console From 0a2ffb188976bf5ad5816f958e89ebce2675838c Mon Sep 17 00:00:00 2001 From: Mateusz Pytel Date: Wed, 16 Mar 2022 23:19:53 +0100 Subject: [PATCH 2/7] Update README.md Fixed downloads badge, removed license scan badge. --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 18396c0..f04d409 100644 --- a/README.md +++ b/README.md @@ -4,12 +4,11 @@ [![License](https://img.shields.io/badge/license-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![SemVer](https://img.shields.io/badge/semver-2.0.0-green)](https://semver.org/) [![PyPI version](https://badge.fury.io/py/kedro-kubeflow.svg)](https://pypi.org/project/kedro-kubeflow/) -[![Downloads](https://pepy.tech/badge/kedro-kubeflow)](https://pepy.tech/project/kedro-kubeflow) +[![Downloads](https://img.shields.io/pypi/dm/kedro-kubeflow)](https://img.shields.io/pypi/dm/kedro-kubeflow) [![Maintainability](https://api.codeclimate.com/v1/badges/fff07cbd2e5012a045a3/maintainability)](https://codeclimate.com/github/getindata/kedro-kubeflow/maintainability) [![Test Coverage](https://api.codeclimate.com/v1/badges/fff07cbd2e5012a045a3/test_coverage)](https://codeclimate.com/github/getindata/kedro-kubeflow/test_coverage) [![Documentation Status](https://readthedocs.org/projects/kedro-kubeflow/badge/?version=latest)](https://kedro-kubeflow.readthedocs.io/en/latest/?badge=latest) -[![FOSSA Status](https://app.fossa.com/api/projects/git%2Bgithub.com%2Fgetindata%2Fkedro-kubeflow.svg?type=shield)](https://app.fossa.com/projects/git%2Bgithub.com%2Fgetindata%2Fkedro-kubeflow?ref=badge_shield) ## About From 763d65766180ac9b1e8dfe8084b43ef5aa187c57 Mon Sep 17 00:00:00 2001 From: Mariusz Strzelecki Date: Thu, 24 Mar 2022 15:35:16 +0100 Subject: [PATCH 3/7] Use KFP 1.8.11 to fix TTL issues (#120) --- CHANGELOG.md | 2 ++ setup.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f093790..208ebd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## [Unreleased] +- KFP SDK version bumped to 1.8.11 in order to fix misbehaving TTL issue + ## [0.6.2] - 2022-03-10 - Added support for defining retry policy for the Kubeflow Pipelines nodes diff --git a/setup.py b/setup.py index ce56a0d..3c265e8 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ INSTALL_REQUIRES = [ "kedro>=0.16,<=0.18", "click<8.0", - "kfp~=1.8.0", + "kfp>=1.8.11,<2.0", "tabulate>=0.8.7", "semver~=2.10", "google-auth<2.0dev", From 752fe27d7c0990e96adb295ccd898a8efbc16eda Mon Sep 17 00:00:00 2001 From: Mariusz Strzelecki Date: Tue, 10 May 2022 12:26:17 +0200 Subject: [PATCH 4/7] Drop VertexAI support (#125) --- CHANGELOG.md | 1 + .../02_installation/02_configuration.md | 5 +- docs/source/03_getting_started/02_gcp.md | 60 +--- kedro_kubeflow/config.py | 26 +- kedro_kubeflow/context_helper.py | 19 +- kedro_kubeflow/vertex_ai/__init__.py | 1 - kedro_kubeflow/vertex_ai/client.py | 181 ----------- kedro_kubeflow/vertex_ai/generator.py | 280 ------------------ kedro_kubeflow/vertex_ai/io.py | 115 ------- setup.py | 5 +- tests/test_config.py | 27 -- tests/test_vertex_ai_client.py | 195 ------------ tests/test_vertex_ai_generator.py | 263 ---------------- 13 files changed, 13 insertions(+), 1165 deletions(-) delete mode 100644 kedro_kubeflow/vertex_ai/__init__.py delete mode 100644 kedro_kubeflow/vertex_ai/client.py delete mode 100644 kedro_kubeflow/vertex_ai/generator.py delete mode 100644 kedro_kubeflow/vertex_ai/io.py delete mode 100644 tests/test_vertex_ai_client.py delete mode 100644 tests/test_vertex_ai_generator.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 208ebd0..605040e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## [Unreleased] - KFP SDK version bumped to 1.8.11 in order to fix misbehaving TTL issue +- Dropped support for VertexAI, please use [kedro-vertexi](https://kedro-kubeflow.readthedocs.io/en/latest/index.html) instead ## [0.6.2] - 2022-03-10 diff --git a/docs/source/02_installation/02_configuration.md b/docs/source/02_installation/02_configuration.md index 29a5663..43377b2 100644 --- a/docs/source/02_installation/02_configuration.md +++ b/docs/source/02_installation/02_configuration.md @@ -16,9 +16,6 @@ run_config: # on the same tag, or Never if you use only local images image_pull_policy: IfNotPresent - # Location of Vertex AI GCS root, required only for vertex ai pipelines configuration - root: bucket_name/gcs_suffix - # Name of the kubeflow experiment to be created experiment_name: Kubeflow Plugin Demo [${branch_name|local}] @@ -66,7 +63,7 @@ run_config: # is collapsed to one node. #node_merge_strategy: none - # Optional volume specification (only for non vertex-ai) + # Optional volume specification volume: # Storage class - use null (or no value) to use the default storage diff --git a/docs/source/03_getting_started/02_gcp.md b/docs/source/03_getting_started/02_gcp.md index c88e2e1..6f8a326 100644 --- a/docs/source/03_getting_started/02_gcp.md +++ b/docs/source/03_getting_started/02_gcp.md @@ -54,61 +54,7 @@ The above will work if you are connecting from within GCP VM or locally with spe service account credentials. It will *NOT* work for credentials obtained with `google cloud application-default login`. -### Using `kedro-kubeflow` with Vertex AI Pipelines (EXPERIMENTAL) - -[Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines) -is a fully managed service that allows to easily deploy -[Kubeflow Pipelines](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/) -on a serverless Google service. [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines) -was still in a Preview mode when this plugin version was released, therefore plugin -capability is also limited. - -##### 1. Preparing configuration - -In order the plugin picks Vertex AI Pipelines as a target infrastructure, it has to be indicated -in configuration. As the solution is serverless, no URL is to be provided. Instead, special set -of parameters has to be passed, so that connection is established with proper GCP service. - -```yaml -host: vertex-ai-pipelines -project_id: hosting-project -region: europe-west4 -run_config: - root: vertex-ai-pipelines-accessible-gcs-bucket/pipelines-specific-path -``` - -If the pipeline requires access to services that are not exposed to public internet, you need to configure [VPC peering between Vertex internal network and VPC that hosts the internal service](https://cloud.google.com/vertex-ai/docs/general/vpc-peering) and then set the VPC identifier in the configuration. Optionally, you can add custom host aliases: - -```yaml -run_config: - vertex_ai_networking: - vpc: projects/12345/global/networks/name-of-vpc - host_aliases: - - ip: 10.10.10.10 - hostnames: ['mlflow.internal'] - - ip: 10.10.20.20 - hostnames: ['featurestore.internal'] -``` - -##### 2. Preparing environment variables - -There're the following specific environment variables required for the pipeline to run correctly: - * SERVICE_ACCOUNT - full email of service account that job will use to run the pipeline. Account has -to have access to `run_config.root` path. Variable is optional, if no given, project compute account is used - * MLFLOW_TRACKING_TOKEN - identity token required if MLFlow is used inside the project and mlflow access - is protected. Token is passed as it is to kedro nodes in order to authenticate against MLFlow service. - Can be generated via `gcloud auth print-identity-token` command. - -#### 3. Supported commands - -Following commands are supported: - -```bash -kedro kubeflow compile -kedro kubeflow run-once -kedro kubeflow schedule -kedro kubeflow list-pipelines -``` - -![Vertex_AI_Pipelines](vertex_ai_pipelines.png) +### Using `kedro-kubeflow` with Vertex AI Pipelines (DEPRECATED) +Vertex AI Pipelines support in `kedro-kubeflow` has been deprecated in favour of the +new plugin [kedro-vertexai](https://kedro-vertexai.readthedocs.io/en/latest/) diff --git a/kedro_kubeflow/config.py b/kedro_kubeflow/config.py index 241579c..331cfc1 100644 --- a/kedro_kubeflow/config.py +++ b/kedro_kubeflow/config.py @@ -16,9 +16,6 @@ # on the same tag, or Never if you use only local images image_pull_policy: IfNotPresent - # Location of Vertex AI GCS root, required only for vertex ai pipelines configuration - #root: bucket_name/gcs_suffix - # Name of the kubeflow experiment to be created experiment_name: {project} @@ -66,7 +63,7 @@ # is collapsed to one node. #node_merge_strategy: none - # Optional volume specification (only for non vertex-ai) + # Optional volume specification volume: # Storage class - use null (or no value) to use the default storage @@ -156,17 +153,6 @@ def __eq__(self, other): return self._raw == other._raw -class VertexAiNetworkingConfig(Config): - @property - def vpc(self): - return self._get_or_default("vpc", None) - - @property - def host_aliases(self): - aliases = self._get_or_default("host_aliases", []) - return {alias["ip"]: alias["hostnames"] for alias in aliases} - - class VolumeConfig(Config): @property def storageclass(self): @@ -299,12 +285,6 @@ def ttl(self): def on_exit_pipeline(self): return self._get_or_default("on_exit_pipeline", None) - @property - def vertex_ai_networking(self): - return VertexAiNetworkingConfig( - self._get_or_default("vertex_ai_networking", {}) - ) - @property def node_merge_strategy(self): strategy = str(self._get_or_default("node_merge_strategy", "none")) @@ -341,10 +321,6 @@ def project_id(self): def region(self): return self._get_or_fail("region") - @property - def is_vertex_ai_pipelines(self): - return self.host == "vertex-ai-pipelines" - @staticmethod def initialize_github_actions(project_name, where, templates_dir): os.makedirs(where / ".github/workflows", exist_ok=True) diff --git a/kedro_kubeflow/context_helper.py b/kedro_kubeflow/context_helper.py index 331d891..5d9788a 100644 --- a/kedro_kubeflow/context_helper.py +++ b/kedro_kubeflow/context_helper.py @@ -68,20 +68,13 @@ def config(self) -> PluginConfig: @property @lru_cache() def kfp_client(self): - if self.config.is_vertex_ai_pipelines: - from .vertex_ai.client import VertexAIPipelinesClient + from .kfpclient import KubeflowClient - return VertexAIPipelinesClient( - self.config, self.project_name, self.context - ) - else: - from .kfpclient import KubeflowClient - - return KubeflowClient( - self.config, - self.project_name, - self.context, - ) + return KubeflowClient( + self.config, + self.project_name, + self.context, + ) @staticmethod def init(metadata, env): diff --git a/kedro_kubeflow/vertex_ai/__init__.py b/kedro_kubeflow/vertex_ai/__init__.py deleted file mode 100644 index 72c34ee..0000000 --- a/kedro_kubeflow/vertex_ai/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""kedro_kubeflow.vertex_ai""" diff --git a/kedro_kubeflow/vertex_ai/client.py b/kedro_kubeflow/vertex_ai/client.py deleted file mode 100644 index 09abf48..0000000 --- a/kedro_kubeflow/vertex_ai/client.py +++ /dev/null @@ -1,181 +0,0 @@ -""" -Vertex AI Pipelines specific client, based on AIPlatformClient. -""" - -import json -import logging -import os -from tempfile import NamedTemporaryFile - -from google.cloud.scheduler_v1.services.cloud_scheduler import ( - CloudSchedulerClient, -) -from kfp.v2 import compiler -from kfp.v2.google.client import AIPlatformClient -from tabulate import tabulate - -from .generator import PipelineGenerator - - -class VertexAIPipelinesClient: - """ - Client for Vertex AI Pipelines. - """ - - log = logging.getLogger(__name__) - - def __init__(self, config, project_name, context): - self.generator = PipelineGenerator(config, project_name, context) - self.api_client = AIPlatformClient( - project_id=config.project_id, region=config.region - ) - self.cloud_scheduler_client = CloudSchedulerClient() - self.location = ( - f"projects/{config.project_id}/locations/{config.region}" - ) - self.run_config = config.run_config - - def list_pipelines(self): - """ - List all the jobs (current and historical) on Vertex AI Pipelines - :return: - """ - pipelines = self.api_client.list_jobs()["pipelineJobs"] - return tabulate( - map(lambda x: [x.get("displayName"), x["name"]], pipelines), - headers=["Name", "ID"], - ) - - def run_once( - self, - pipeline, - image, - experiment_name, - run_name, - wait=False, - image_pull_policy="IfNotPresent", - experiment_namespace=None, - ): - """ - Runs the pipeline in Vertex AI Pipelines - :param pipeline: - :param image: - :param experiment_name: - :param run_name: - :param wait: - :param image_pull_policy: - :return: - """ - with NamedTemporaryFile( - mode="rt", prefix="kedro-kubeflow", suffix=".json" - ) as spec_output: - self.compile( - pipeline, - image, - output=spec_output.name, - image_pull_policy=image_pull_policy, - ) - - run = self.api_client.create_run_from_job_spec( - service_account=os.getenv("SERVICE_ACCOUNT"), - job_spec_path=spec_output.name, - job_id=run_name, - pipeline_root=f"gs://{self.run_config.root}", - parameter_values={}, - enable_caching=False, - network=self.run_config.vertex_ai_networking.vpc, - ) - self.log.info("Run created %s", str(run)) - return run - - def compile( - self, - pipeline, - image, - output, - image_pull_policy="IfNotPresent", - ): - """ - Creates json file in given local output path - :param pipeline: - :param image: - :param output: - :param image_pull_policy: - :return: - """ - token = os.getenv("MLFLOW_TRACKING_TOKEN") - pipeline_func = self.generator.generate_pipeline( - pipeline, image, image_pull_policy, token - ) - compiler.Compiler().compile( - pipeline_func=pipeline_func, - package_path=output, - ) - self.log.info( - "Generated pipeline definition was saved to %s", str(output) - ) - - def upload(self, pipeline, image, image_pull_policy="IfNotPresent"): - """ - Upload is not supported by Vertex AI Pipelines - :param pipeline: - :param image: - :param image_pull_policy: - :return: - """ - raise NotImplementedError("Upload is not supported for VertexAI") - - def _cleanup_old_schedule(self, pipeline_name): - """ - Removes old jobs scheduled for given pipeline name - """ - for job in self.cloud_scheduler_client.list_jobs(parent=self.location): - if "jobs/pipeline_pipeline" not in job.name: - continue - - job_pipeline_name = json.loads(job.http_target.body)[ - "pipelineSpec" - ]["pipelineInfo"]["name"] - if job_pipeline_name == pipeline_name: - self.log.info( - "Found existing schedule for the pipeline at %s, deleting...", - job.schedule, - ) - self.cloud_scheduler_client.delete_job(name=job.name) - - def schedule( - self, - pipeline, - experiment_name, - experiment_namespace, - cron_expression, - image_pull_policy="IfNotPresent", - ): - """ - Schedule pipeline to Vertex AI with given cron expression - :param pipeline: - :param experiment_name: - :param experiment_namespace: - :param cron_expression: - :param image_pull_policy: - :return: - """ - self._cleanup_old_schedule(self.generator.get_pipeline_name()) - with NamedTemporaryFile( - mode="rt", prefix="kedro-kubeflow", suffix=".json" - ) as spec_output: - self.compile( - pipeline, - self.run_config.image, - output=spec_output.name, - image_pull_policy=image_pull_policy, - ) - self.api_client.create_schedule_from_job_spec( - job_spec_path=spec_output.name, - time_zone="Etc/UTC", - schedule=cron_expression, - pipeline_root=f"gs://{self.run_config.root}", - enable_caching=False, - ) - - self.log.info("Pipeline scheduled to %s", cron_expression) diff --git a/kedro_kubeflow/vertex_ai/generator.py b/kedro_kubeflow/vertex_ai/generator.py deleted file mode 100644 index c0cdec0..0000000 --- a/kedro_kubeflow/vertex_ai/generator.py +++ /dev/null @@ -1,280 +0,0 @@ -""" -Generator for Vertex AI pipelines -""" - -import logging -from tempfile import NamedTemporaryFile -from typing import Dict, Set - -import kfp -from kedro.pipeline.node import Node -from kfp.components.structures import ( - ComponentSpec, - ContainerImplementation, - ContainerSpec, - InputSpec, - OutputPathPlaceholder, - OutputSpec, -) -from kfp.v2 import dsl - -from kedro_kubeflow.utils import clean_name, is_mlflow_enabled -from kedro_kubeflow.vertex_ai.io import ( - generate_inputs, - generate_mlflow_inputs, - generate_outputs, -) - - -class PipelineGenerator: - """ - Generator creates Vertex AI pipeline function that operatoes with Vertex AI specific - opertator spec. - """ - - log = logging.getLogger(__name__) - - def __init__(self, config, project_name, context): - self.project_name = project_name - self.context = context - self.run_config = config.run_config - self.catalog = context.config_loader.get("catalog*") - - def get_pipeline_name(self): - """ - Returns Vertex-compatible pipeline name - """ - return self.project_name.lower().replace(" ", "-") - - def generate_pipeline(self, pipeline, image, image_pull_policy, token): - """ - This method return @dsl.pipeline annotated function that contains - dynamically generated pipelines. - :param pipeline: kedro pipeline - :param image: full docker image name - :param image_pull_policy: docker pull policy - :param token: mlflow authentication token - :return: kfp pipeline function - """ - - def set_dependencies(node, dependencies, kfp_ops): - for dependency in dependencies: - name = clean_name(node.name) - dependency_name = clean_name(dependency.name) - kfp_ops[name].after(kfp_ops[dependency_name]) - - @dsl.pipeline( - name=self.get_pipeline_name(), - description=self.run_config.description, - ) - def convert_kedro_pipeline_to_kfp() -> None: - node_dependencies = self.context.pipelines.get( - pipeline - ).node_dependencies - kfp_ops = self._build_kfp_ops( - node_dependencies, image, pipeline, token - ) - for node, dependencies in node_dependencies.items(): - set_dependencies(node, dependencies, kfp_ops) - - if self.run_config.volume and not self.run_config.volume.skip_init: - self._create_data_volume_init_op(kfp_ops, image) - - for operator in kfp_ops.values(): - operator.container.set_image_pull_policy(image_pull_policy) - - return convert_kedro_pipeline_to_kfp - - def _generate_hosts_file(self): - host_aliases = self.run_config.vertex_ai_networking.host_aliases - return " ".join( - f"echo {ip}\t{' '.join(hostnames)} >> /etc/hosts;" - for ip, hostnames in host_aliases.items() - ) - - def _create_data_volume_init_op( - self, kfp_ops: Dict[str, dsl.ContainerOp], image: str - ): - data_volume_init = self._setup_volume_op(image) - for name, ops in kfp_ops.items(): - if name != "mlflow-start-run": - ops.after(data_volume_init) - kfp_ops["data-volume-init"] = data_volume_init - - def _create_mlflow_op(self, image, tracking_token) -> dsl.ContainerOp: - mlflow_command = " ".join( - [ - self._generate_hosts_file(), - "mkdir --parents " - "`dirname {{$.outputs.parameters['output'].output_file}}`", - "&&", - "MLFLOW_TRACKING_TOKEN={{$.inputs.parameters['mlflow_tracking_token']}} " - f"kedro kubeflow -e {self.context.env} mlflow-start " - "--output {{$.outputs.parameters['output'].output_file}} " - + self.run_config.run_name, - ] - ) - - spec = ComponentSpec( - name="mlflow-start-run", - inputs=[InputSpec("mlflow_tracking_token", "String")], - outputs=[OutputSpec("output", "String")], - implementation=ContainerImplementation( - container=ContainerSpec( - image=image, - command=["/bin/bash", "-c"], - args=[ - mlflow_command, - OutputPathPlaceholder(output_name="output"), - ], - ) - ), - ) - with NamedTemporaryFile( - mode="w", prefix="kedro-kubeflow-spec", suffix=".yaml" - ) as spec_file: - spec.save(spec_file.name) - component = kfp.components.load_component_from_file(spec_file.name) - return component(tracking_token) - - def _create_params_parameter(self) -> str: - params_parameter = ",".join( - [f"{key}:{value}" for key, value in self.context.params.items()] - ) - if params_parameter: - params_parameter = f"--params {params_parameter}" - return params_parameter - - def _build_kfp_ops( - self, - node_dependencies: Dict[Node, Set[Node]], - image, - pipeline, - tracking_token=None, - ) -> Dict[str, dsl.ContainerOp]: - """Build kfp container graph from Kedro node dependencies.""" - kfp_ops = {} - - if is_mlflow_enabled(): - kfp_ops["mlflow-start-run"] = self._create_mlflow_op( - image, tracking_token - ) - - params_parameter = self._create_params_parameter() - - for node in node_dependencies: - name = clean_name(node.name) - - ( - output_specs, - output_copy_commands, - output_placeholders, - ) = generate_outputs(node, self.catalog) - input_params, input_specs = generate_inputs( - node, node_dependencies, self.catalog - ) - mlflow_inputs, mlflow_tokens = generate_mlflow_inputs() - component_params = ( - [tracking_token, kfp_ops["mlflow-start-run"].output] - if is_mlflow_enabled() - else [] - ) - - kedro_command = " ".join( - [ - f"kedro run -e {self.context.env}", - f"--pipeline {pipeline}", - f"{params_parameter}", - f'--node "{node.name}"', - ] - ) - node_command = " ".join( - [ - self._generate_hosts_file(), - "rm -r /home/kedro/data" - "&&" - f"ln -s /gcs/{self._get_data_path()} /home/kedro/data" - "&&", - mlflow_tokens + kedro_command, - ] - ) - spec = ComponentSpec( - name=name, - inputs=mlflow_inputs + input_specs, - outputs=output_specs, - implementation=ContainerImplementation( - container=ContainerSpec( - image=image, - command=["/bin/bash", "-c"], - args=[node_command + " " + output_copy_commands] - + output_placeholders, - ) - ), - ) - kfp_ops[name] = self._create_kedro_op( - name, spec, component_params + input_params - ) - - return kfp_ops - - def _create_kedro_op( - self, name: str, spec: ComponentSpec, op_function_parameters - ): - with NamedTemporaryFile( - mode="w", prefix="kedro-kubeflow-node-spec", suffix=".yaml" - ) as spec_file: - spec.save(spec_file.name) - component = kfp.components.load_component_from_file(spec_file.name) - - operator = component(*op_function_parameters) - - self._configure_resources(name, operator) - return operator - - def _configure_resources(self, name: str, operator): - resources = self.run_config.resources.get_for(name) - if "cpu" in resources: - operator.set_cpu_limit(resources["cpu"]) - operator.set_cpu_request(resources["cpu"]) - if "memory" in resources: - operator.set_memory_limit(resources["memory"]) - operator.set_memory_request(resources["memory"]) - if "cloud.google.com/gke-accelerator" in resources: - operator.add_node_selector_constraint( - "cloud.google.com/gke-accelerator", - resources["cloud.google.com/gke-accelerator"], - ) - if "nvidia.com/gpu" in resources: - operator.set_gpu_limit(resources["nvidia.com/gpu"]) - - def _get_data_path(self): - return ( - f"{self.run_config.root}/" - f"{self.run_config.experiment_name}/{self.run_config.run_name}/data" - ) - - def _setup_volume_op(self, image): - command = " ".join( - [ - f"mkdir --parents /gcs/{self._get_data_path()} &&", - f"cp -r /home/kedro/data/* /gcs/{self._get_data_path()}", - ] - ) - spec = ComponentSpec( - name="data-volume-init", - inputs=[], - implementation=ContainerImplementation( - container=ContainerSpec( - image=image, command=["/bin/bash", "-c"], args=[command] - ) - ), - ) - - with NamedTemporaryFile( - mode="w", prefix="kedro-kubeflow-data-volume-init", suffix=".yaml" - ) as spec_file: - spec.save(spec_file.name) - component = kfp.components.load_component_from_file(spec_file.name) - volume_init = component() - - return volume_init diff --git a/kedro_kubeflow/vertex_ai/io.py b/kedro_kubeflow/vertex_ai/io.py deleted file mode 100644 index e3795ac..0000000 --- a/kedro_kubeflow/vertex_ai/io.py +++ /dev/null @@ -1,115 +0,0 @@ -""" -Pipeline input and output helper methods for spec generation -""" - -from typing import Dict, Set - -import kfp -from kedro.pipeline.node import Node -from kfp.components import structures - -from kedro_kubeflow.utils import clean_name, is_mlflow_enabled - - -def _find_input_node(input_name, nodes): - return [node for node in nodes if input_name in node.outputs] - - -def generate_inputs( - node: Node, node_dependencies: Dict[Node, Set[Node]], catalog -): - """ - Generates inputs for a particular kedro node - """ - - def is_file_path_input(input_data): - return ( - input_data in catalog - and "filepath" in catalog[input_data] - and ":/" not in catalog[input_data]["filepath"] - ) - - input_mapping = { - i: catalog[i]["filepath"] for i in node.inputs if is_file_path_input(i) - } - - input_params_mapping = {} - for input_name in input_mapping: - input_node = _find_input_node(input_name, node_dependencies) - if input_node: - input_params_mapping[input_name] = input_node[0] - - input_params = [ - kfp.dsl.PipelineParam( - name=i, - op_name=clean_name(input_params_mapping[i].name), - param_type="Dataset", - ) - for i in input_params_mapping - ] - input_specs = [ - structures.InputSpec(param.name, "Dataset") for param in input_params - ] - - return input_params, input_specs - - -def get_output_type(output, catalog): - """ - Returns Vertex output type based on the layer in Kedro catalog - """ - if catalog[output].get("layer") == "models": - return "Model" - return "Dataset" - - -def generate_outputs(node: Node, catalog): - """ - Generates outputs for a particular kedro node - """ - data_mapping = { - o: catalog[o]["filepath"] - for o in node.outputs - if o in catalog - and "filepath" in catalog[o] - and ":/" not in catalog[o]["filepath"] - } - output_specs = [ - structures.OutputSpec(o, get_output_type(o, catalog)) - for o in data_mapping.keys() - ] - output_copy_commands = " ".join( - [ - f"&& mkdir --parents `dirname {{{{$.outputs.artifacts['{o}'].path}}}}` " - f"&& cp /home/kedro/{filepath} {{{{$.outputs.artifacts['{o}'].path}}}}" - for o, filepath in data_mapping.items() - ] - ) - output_placeholders = [ - structures.OutputPathPlaceholder(output_name=o) - for o in data_mapping.keys() - ] - return output_specs, output_copy_commands, output_placeholders - - -def generate_mlflow_inputs(): - """ - Generates inputs that are required to correctly generate mlflow specific data. - :return: mlflow_inputs, mlflow_tokens - """ - mlflow_inputs = ( - [ - structures.InputSpec("mlflow_tracking_token", "String"), - structures.InputSpec("mlflow_run_id", "String"), - ] - if is_mlflow_enabled() - else [] - ) - mlflow_tokens = ( - "MLFLOW_TRACKING_TOKEN={{$.inputs.parameters['mlflow_tracking_token']}} " - "MLFLOW_RUN_ID=\"{{$.inputs.parameters['mlflow_run_id']}}\" " - if is_mlflow_enabled() - else "" - ) - - return mlflow_inputs, mlflow_tokens diff --git a/setup.py b/setup.py index 3c265e8..48d50c4 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ # Runtime Requirements. INSTALL_REQUIRES = [ - "kedro>=0.16,<=0.18", + "kedro>=0.16,<0.18", "click<8.0", "kfp>=1.8.11,<2.0", "tabulate>=0.8.7", @@ -30,9 +30,6 @@ "recommonmark==0.7.1", "sphinx_rtd_theme==0.6.2", ], - "vertexai": [ - "google-cloud-scheduler>=2.3.2", - ], } setup( diff --git a/tests/test_config.py b/tests/test_config.py index bbd5855..75d9b51 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -24,18 +24,6 @@ keep: True """ -VERTEX_YAML = """ -host: vertex-ai-pipelines -project_id: some-project -region: some-region -run_config: - vertex_ai_networking: - vpc: projects/some-project-id/global/networks/some-vpc-name - host_aliases: - - ip: 10.10.10.10 - hostnames: ['mlflow.internal'] -""" - class TestPluginConfig(unittest.TestCase): def test_plugin_config(self): @@ -111,21 +99,6 @@ def test_do_not_keep_volume_by_default(self): cfg = PluginConfig({"run_config": {"volume": {}}}) assert cfg.run_config.volume.keep is False - def test_parse_vertex_ai_networking_config(self): - cfg = PluginConfig(yaml.safe_load(VERTEX_YAML)) - assert ( - cfg.run_config.vertex_ai_networking.vpc - == "projects/some-project-id/global/networks/some-vpc-name" - ) - assert cfg.run_config.vertex_ai_networking.host_aliases == { - "10.10.10.10": ["mlflow.internal"] - } - - def test_accept_default_vertex_ai_networking_config(self): - cfg = PluginConfig({"run_config": {}}) - assert cfg.run_config.vertex_ai_networking.vpc is None - assert cfg.run_config.vertex_ai_networking.host_aliases == {} - def test_reuse_run_name_for_scheduled_run_name(self): cfg = PluginConfig({"run_config": {"run_name": "some run"}}) assert cfg.run_config.run_name == "some run" diff --git a/tests/test_vertex_ai_client.py b/tests/test_vertex_ai_client.py deleted file mode 100644 index 44e5b92..0000000 --- a/tests/test_vertex_ai_client.py +++ /dev/null @@ -1,195 +0,0 @@ -"""Test kedro_kubeflow module.""" - -import unittest -from unittest.mock import MagicMock, patch - -from kedro_kubeflow.config import PluginConfig -from kedro_kubeflow.utils import strip_margin -from kedro_kubeflow.vertex_ai.client import VertexAIPipelinesClient - - -class TestKubeflowClient(unittest.TestCase): - @patch("kedro_kubeflow.vertex_ai.client.CloudSchedulerClient") - def create_client(self, cloud_scheduler_client_mock): - self.cloud_scheduler_client_mock = ( - cloud_scheduler_client_mock.return_value - ) - config = PluginConfig( - { - "project_id": "PROJECT_ID", - "region": "REGION", - "run_config": { - "image": "IMAGE", - "root": "BUCKET/PREFIX", - "vertex_ai_networking": {"vpc": "my-vpc"}, - }, - } - ) - return VertexAIPipelinesClient(config, MagicMock(), MagicMock()) - - def test_compile(self): - with patch( - "kedro_kubeflow.vertex_ai.generator.PipelineGenerator" - ), patch("kedro_kubeflow.vertex_ai.client.AIPlatformClient"), patch( - "kfp.v2.compiler.Compiler" - ) as Compiler: - compiler = Compiler.return_value - - client_under_test = self.create_client() - client_under_test.compile( - MagicMock("pipeline"), "image", "some_path" - ) - - compiler.compile.assert_called_once() - - def test_upload_not_supported_by_vertex_ai(self): - with patch( - "kedro_kubeflow.vertex_ai.generator.PipelineGenerator" - ), patch("kedro_kubeflow.vertex_ai.client.AIPlatformClient"): - client_under_test = self.create_client() - - with self.assertRaises(NotImplementedError): - client_under_test.upload(MagicMock("pipeline"), "image") - - def test_run_once(self): - with patch( - "kedro_kubeflow.vertex_ai.generator.PipelineGenerator" - ), patch( - "kedro_kubeflow.vertex_ai.client.AIPlatformClient" - ) as AIPlatformClient, patch( - "kfp.v2.compiler.Compiler" - ): - ai_client = AIPlatformClient.return_value - - run_mock = {"run": "mock"} - ai_client.create_run_from_job_spec.return_value = run_mock - client_under_test = self.create_client() - run = client_under_test.run_once( - MagicMock("pipeline"), "image", None, "test-run" - ) - - assert run_mock == run - _, kwargs = ai_client.create_run_from_job_spec.call_args - assert kwargs["network"] == "my-vpc" - - def test_should_list_pipelines(self): - with patch( - "kedro_kubeflow.vertex_ai.client.AIPlatformClient" - ) as AIPlatformClient: - ai_client = AIPlatformClient.return_value - ai_client.list_jobs.return_value = { - "pipelineJobs": [ - { - "displayName": "run1", - "name": "projects/29350373243/locations/" - "europe-west4/pipelineJobs/run1", - }, - { - "displayName": "run2", - "name": "projects/29350373243/locations/" - "europe-west4/pipelineJobs/run2", - }, - { - "name": "projects/123/locations/" - "europe-west4/pipelineJobs/no-display-name", - }, - ] - } - - client_under_test = self.create_client() - tabulation = client_under_test.list_pipelines() - - expected_output = """ - |Name ID - |------ ---------------------------------------------------------------- - |run1 projects/29350373243/locations/europe-west4/pipelineJobs/run1 - |run2 projects/29350373243/locations/europe-west4/pipelineJobs/run2 - | projects/123/locations/europe-west4/pipelineJobs/no-display-name""" - assert tabulation == strip_margin(expected_output) - - def test_should_schedule_pipeline(self): - with patch( - "kedro_kubeflow.vertex_ai.generator.PipelineGenerator" - ), patch( - "kedro_kubeflow.vertex_ai.client.AIPlatformClient" - ) as AIPlatformClient, patch( - "kfp.v2.compiler.Compiler" - ): - ai_client = AIPlatformClient.return_value - - client_under_test = self.create_client() - client_under_test.schedule( - MagicMock("pipeline"), None, None, "0 0 12 * *" - ) - - ai_client.create_schedule_from_job_spec.assert_called_once() - args, kwargs = ai_client.create_schedule_from_job_spec.call_args - assert kwargs["time_zone"] == "Etc/UTC" - assert kwargs["enable_caching"] is False - assert kwargs["schedule"] == "0 0 12 * *" - assert kwargs["pipeline_root"] == "gs://BUCKET/PREFIX" - - def test_should_remove_old_schedule(self): - def mock_job(job_name, pipeline_name=None): - if pipeline_name: - body = ( - '{"pipelineSpec": {"pipelineInfo": {"name": "' - + pipeline_name - + '"}}}' - ) - else: - body = "" - return type( - "obj", - (object,), - { - "schedule": "* * * * *", - "name": job_name, - "http_target": type("obj", (object,), {"body": body}), - }, - ) - - with patch( - "kedro_kubeflow.vertex_ai.client.PipelineGenerator" - ) as generator, patch( - "kedro_kubeflow.vertex_ai.client.AIPlatformClient" - ) as AIPlatformClient, patch( - "kfp.v2.compiler.Compiler" - ): - # given - ai_client = AIPlatformClient.return_value - client_under_test = self.create_client() - generator.return_value.get_pipeline_name.return_value = ( - "unittest-pipeline" - ) - self.cloud_scheduler_client_mock.list_jobs.return_value = [ - # not removed (some other job) - mock_job(job_name="some-job"), - # not removed (some other pipeline) - mock_job( - job_name="projects/.../locations/.../jobs/pipeline_pipeline_abc", - pipeline_name="some-other-pipeline", - ), - # removed - mock_job( - job_name="projects/.../locations/.../jobs/pipeline_pipeline_def", - pipeline_name="unittest-pipeline", - ), - ] - - # when - client_under_test.schedule( - MagicMock("pipeline"), None, None, "0 0 12 * *" - ) - - # then - ai_client.create_schedule_from_job_spec.assert_called_once() - self.cloud_scheduler_client_mock.delete_job.assert_called_once() - ( - args, - kwargs, - ) = self.cloud_scheduler_client_mock.delete_job.call_args - assert ( - kwargs["name"] - == "projects/.../locations/.../jobs/pipeline_pipeline_def" - ) diff --git a/tests/test_vertex_ai_generator.py b/tests/test_vertex_ai_generator.py deleted file mode 100644 index 64bbfa9..0000000 --- a/tests/test_vertex_ai_generator.py +++ /dev/null @@ -1,263 +0,0 @@ -"""Test generator""" - -import unittest -from unittest.mock import MagicMock - -import kfp -from kedro.pipeline import Pipeline, node -from kfp.dsl import PipelineParam - -from kedro_kubeflow.config import PluginConfig -from kedro_kubeflow.vertex_ai.generator import PipelineGenerator - - -def identity(input1: str): - return input1 # pragma: no cover - - -class TestGenerator(unittest.TestCase): - def create_pipeline(self): - return Pipeline( - [ - node(identity, "A", "B", name="node1"), - node(identity, "B", "C", name="node2"), - ] - ) - - def test_support_modification_of_pull_policy(self): - # given - self.create_generator() - - # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Never", "MLFLOW_TRACKING_TOKEN" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() - - # then - assert dsl_pipeline.ops["node1"].container.image == "unittest-image" - assert dsl_pipeline.ops["node1"].container.image_pull_policy == "Never" - - def test_should_skip_volume_init_if_requested(self): - # given - self.create_generator(config={"volume": {"skip_init": True}}) - - # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always", "MLFLOW_TRACKING_TOKEN" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() - - # then - assert len(dsl_pipeline.ops) == 2 - assert "data-volume-init" not in dsl_pipeline.ops - for node_name in ["node1", "node2"]: - assert not dsl_pipeline.ops[node_name].container.volume_mounts - - def test_should_not_add_resources_spec_if_not_requested(self): - # given - self.create_generator(config={}) - - # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always", "MLFLOW_TRACKING_TOKEN" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() - - # then - for node_name in ["node1", "node2"]: - spec = dsl_pipeline.ops[node_name].container - assert spec.resources is None - - def test_should_add_resources_spec(self): - # given - self.create_generator( - config={ - "resources": { - "__default__": {"cpu": "100m"}, - "node1": {"cpu": "400m", "memory": "64Gi"}, - } - } - ) - - # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always", "MLFLOW_TRACKING_TOKEN" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() - - # then - node1_spec = dsl_pipeline.ops["node1"].container.resources - node2_spec = dsl_pipeline.ops["node2"].container.resources - assert node1_spec.limits == {"cpu": "400m", "memory": "64Gi"} - assert node1_spec.requests == {"cpu": "400m", "memory": "64Gi"} - assert node2_spec.limits == {"cpu": "100m"} - assert node2_spec.requests == {"cpu": "100m"} - - def test_should_set_description(self): - # given - self.create_generator(config={"description": "DESC"}) - - # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Never", "MLFLOW_TRACKING_TOKEN" - ) - - # then - assert pipeline._component_description == "DESC" - - def test_artifact_registration(self): - # given - self.create_generator( - catalog={ - "B": { - "type": "pandas.CSVDataSet", - "filepath": "data/02_intermediate/b.csv", - }, - "C": { - "type": "pickle.PickleDataSet", - "filepath": "data/06_models/model.pkl", - "layer": "models", - }, - } - ) - - # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always", "MLFLOW_TRACKING_TOKEN" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() - - # then - outputs1 = dsl_pipeline.ops["node1"].outputs - assert len(outputs1) == 2 - assert "B" in outputs1 - assert outputs1["B"] == PipelineParam( - name="B", op_name="node1", param_type="Dataset" - ) - outputs2 = dsl_pipeline.ops["node2"].outputs - assert outputs2["C"].param_type == "Model" - - def test_should_skip_volume_removal_if_requested(self): - # given - self.create_generator(config={"volume": {"keep": True}}) - - # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Always", "MLFLOW_TRACKING_TOKEN" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() - - # then - assert "schedule-volume-termination" not in dsl_pipeline.ops - - def test_should_add_env_and_pipeline_in_the_invocations(self): - # given - self.create_generator() - self.mock_mlflow(True) - - # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Never", "MLFLOW_TRACKING_TOKEN" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() - - # then - assert ( - "kedro kubeflow -e unittests mlflow-start" - in dsl_pipeline.ops["mlflow-start-run"].container.args[0] - ) - assert ( - 'kedro run -e unittests --pipeline pipeline --node "node1"' - in dsl_pipeline.ops["node1"].container.args[0] - ) - - def test_should_add_host_aliases_if_requested(self): - # given - self.create_generator( - config={ - "vertex_ai_networking": { - "host_aliases": [ - { - "ip": "10.10.10.10", - "hostnames": ["mlflow.internal", "mlflow.cloud"], - } - ] - } - } - ) - self.mock_mlflow(True) - - # when - pipeline = self.generator_under_test.generate_pipeline( - "pipeline", "unittest-image", "Never", "MLFLOW_TRACKING_TOKEN" - ) - with kfp.dsl.Pipeline(None) as dsl_pipeline: - pipeline() - - # then - hosts_entry_cmd = ( - "echo 10.10.10.10\tmlflow.internal mlflow.cloud >> /etc/hosts;" - ) - assert ( - hosts_entry_cmd - in dsl_pipeline.ops["mlflow-start-run"].container.args[0] - ) - assert hosts_entry_cmd in dsl_pipeline.ops["node1"].container.args[0] - - def mock_mlflow(self, enabled=False): - def fakeimport(name, *args, **kw): - if not enabled and name == "mlflow": - raise ImportError - return self.realimport(name, *args, **kw) - - __builtins__["__import__"] = fakeimport - - def setUp(self): - self.realimport = __builtins__["__import__"] - self.mock_mlflow(False) - - def tearDown(self): - __builtins__["__import__"] = self.realimport - - def create_generator(self, config={}, params={}, catalog={}): - project_name = "my-awesome-project" - config_loader = MagicMock() - config_loader.get.return_value = catalog - context = type( - "obj", - (object,), - { - "env": "unittests", - "params": params, - "config_loader": config_loader, - "pipelines": { - "pipeline": Pipeline( - [ - node(identity, "A", "B", name="node1"), - node(identity, "B", "C", name="node2"), - ] - ) - }, - }, - ) - config_with_defaults = { - "root": "sample-bucket/sample-suffix", - "experiment_name": "test-experiment", - "run_name": "test-run", - } - config_with_defaults.update(config) - self.generator_under_test = PipelineGenerator( - PluginConfig( - {"host": "http://unittest", "run_config": config_with_defaults} - ), - project_name, - context, - ) From f81b32f487164373095504c1ae17dc0f94bcc38b Mon Sep 17 00:00:00 2001 From: Mariusz Strzelecki Date: Tue, 10 May 2022 12:55:29 +0200 Subject: [PATCH 5/7] Add Kedro env to pipeline name during upload (#124) --- CHANGELOG.md | 1 + kedro_kubeflow/cli.py | 2 ++ kedro_kubeflow/context_helper.py | 4 ++++ kedro_kubeflow/kfpclient.py | 13 ++++++------ tests/test_cli.py | 8 ++++++- tests/test_kfpclient.py | 36 +++++++++++++++++++++++++++++++- 6 files changed, 56 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 605040e..d2aa2d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - KFP SDK version bumped to 1.8.11 in order to fix misbehaving TTL issue - Dropped support for VertexAI, please use [kedro-vertexi](https://kedro-kubeflow.readthedocs.io/en/latest/index.html) instead +- Add Kedro environment name to the pipeline name during upload ## [0.6.2] - 2022-03-10 diff --git a/kedro_kubeflow/cli.py b/kedro_kubeflow/cli.py index 96a9c42..fceb958 100644 --- a/kedro_kubeflow/cli.py +++ b/kedro_kubeflow/cli.py @@ -173,6 +173,7 @@ def upload_pipeline(ctx, image, pipeline) -> None: pipeline_name=pipeline, image=image if image else config.image, image_pull_policy=config.image_pull_policy, + env=ctx.obj["context_helper"].env, ) @@ -236,6 +237,7 @@ def schedule( cron_expression, run_name=config.scheduled_run_name, parameters=format_params(params), + env=ctx.obj["context_helper"].env, ) diff --git a/kedro_kubeflow/context_helper.py b/kedro_kubeflow/context_helper.py index 5d9788a..faeded5 100644 --- a/kedro_kubeflow/context_helper.py +++ b/kedro_kubeflow/context_helper.py @@ -53,6 +53,10 @@ def session(self): return KedroSession.create(self._metadata.package_name, env=self._env) + @property + def env(self): + return self._env + @property def context(self): return self.session.load_context() diff --git a/kedro_kubeflow/kfpclient.py b/kedro_kubeflow/kfpclient.py index fd55262..239cfb8 100644 --- a/kedro_kubeflow/kfpclient.py +++ b/kedro_kubeflow/kfpclient.py @@ -94,15 +94,15 @@ def compile( ) self.log.info("Generated pipeline definition was saved to %s" % output) - def get_full_pipeline_name(self, pipeline_name): - return f"[{self.project_name}] {pipeline_name}" + def get_full_pipeline_name(self, pipeline_name, env): + return f"[{self.project_name}] {pipeline_name} (env: {env})"[:100] - def upload(self, pipeline_name, image, image_pull_policy="IfNotPresent"): + def upload(self, pipeline_name, image, image_pull_policy, env): pipeline = self.generator.generate_pipeline( pipeline_name, image, image_pull_policy ) - full_pipeline_name = self.get_full_pipeline_name(pipeline_name) + full_pipeline_name = self.get_full_pipeline_name(pipeline_name, env) if self._pipeline_exists(full_pipeline_name): pipeline_id = self.client.get_pipeline_id(full_pipeline_name) version_id = self._upload_pipeline_version(pipeline, pipeline_id) @@ -169,13 +169,14 @@ def schedule( experiment_namespace, cron_expression, run_name, - parameters={}, + parameters, + env, ): experiment_id = self._ensure_experiment_exists( experiment_name, experiment_namespace ) pipeline_id = self.client.get_pipeline_id( - self.get_full_pipeline_name(pipeline) + self.get_full_pipeline_name(pipeline, env) ) formatted_run_name = run_name.format(**parameters) self._disable_runs(experiment_id, formatted_run_name) diff --git a/tests/test_cli.py b/tests/test_cli.py index 82623d2..08b8e09 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -119,6 +119,7 @@ def test_compile(self): def test_upload_pipeline(self): context_helper = MagicMock(ContextHelper) context_helper.config = test_config + context_helper.env = "kubeflow-env" config = dict(context_helper=context_helper) runner = CliRunner() @@ -128,12 +129,16 @@ def test_upload_pipeline(self): assert result.exit_code == 0 context_helper.kfp_client.upload.assert_called_with( - image="img", image_pull_policy="Always", pipeline_name="pipe" + image="img", + image_pull_policy="Always", + pipeline_name="pipe", + env="kubeflow-env", ) def test_schedule(self): context_helper = MagicMock(ContextHelper) context_helper.config = test_config + context_helper.env = "kubeflow-env" config = dict(context_helper=context_helper) runner = CliRunner() @@ -160,6 +165,7 @@ def test_schedule(self): "* * *", run_name="test run", parameters={"key1": "some value"}, + env="kubeflow-env", ) @patch.object(Path, "cwd") diff --git a/tests/test_kfpclient.py b/tests/test_kfpclient.py index 50eb087..aeef841 100644 --- a/tests/test_kfpclient.py +++ b/tests/test_kfpclient.py @@ -230,6 +230,8 @@ def test_should_schedule_pipeline(self): cron_expression="0 * * * * *", experiment_namespace=None, run_name="scheduled run of pipeline X", + parameters={}, + env="kubeflow-env", ) # then @@ -260,6 +262,8 @@ def test_should_schedule_pipeline_and_create_experiment_if_needed(self): cron_expression="0 * * * * *", experiment_namespace=None, run_name="pipeline X", + parameters={}, + env="kubeflow-env", ) # then @@ -291,6 +295,7 @@ def test_should_disable_old_runs_before_schedule(self): experiment_namespace=None, run_name="scheduled run for region {region}", parameters={"region": "ABC"}, + env="kubeflow-env", ) # then @@ -315,6 +320,7 @@ def test_should_upload_new_pipeline(self): pipeline_name="pipeline_name", image="unittest-image", image_pull_policy="Always", + env="kubeflow-env", ) # then @@ -324,9 +330,36 @@ def test_should_upload_new_pipeline(self): args, kwargs, ) = self.kfp_client_mock.pipeline_uploads.upload_pipeline.call_args - assert kwargs["name"] == "[my-awesome-project] pipeline_name" + assert ( + kwargs["name"] + == "[my-awesome-project] pipeline_name (env: kubeflow-env)" + ) assert kwargs["description"] == "Very Important Pipeline" + def test_should_truncated_the_pipeline_name_to_100_characters_on_upload( + self, + ): + # given + self.create_client({"description": "Very Important Pipeline"}) + self.kfp_client_mock.get_pipeline_id.return_value = None + + # when + self.client_under_test.upload( + pipeline_name="pipeline_name", + image="unittest-image", + image_pull_policy="Always", + env="kubeflow-env" + "1" * 100, + ) + + # then + self.kfp_client_mock.pipeline_uploads.upload_pipeline.assert_called() + self.kfp_client_mock.pipeline_uploads.upload_pipeline_version.assert_not_called() + ( + args, + kwargs, + ) = self.kfp_client_mock.pipeline_uploads.upload_pipeline.call_args + assert len(kwargs["name"]) == 100 + def test_should_upload_new_version_of_existing_pipeline(self): # given self.kfp_client_mock.get_pipeline_id.return_value = "123" @@ -336,6 +369,7 @@ def test_should_upload_new_version_of_existing_pipeline(self): pipeline_name="pipeline", image="unittest-image", image_pull_policy="Always", + env="kubeflow-env", ) # then From ba87e39a027966e98083d58e4040902d6fddcb47 Mon Sep 17 00:00:00 2001 From: github-actions Date: Tue, 10 May 2022 10:56:26 +0000 Subject: [PATCH 6/7] FIX #126 - Bump version and CHANGELOG for release 0.6.3 --- CHANGELOG.md | 6 +++++- kedro_kubeflow/__init__.py | 2 +- setup.cfg | 2 +- setup.py | 4 ++-- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d2aa2d0..9ea336b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## [Unreleased] +## [0.6.3] - 2022-05-10 + - KFP SDK version bumped to 1.8.11 in order to fix misbehaving TTL issue - Dropped support for VertexAI, please use [kedro-vertexi](https://kedro-kubeflow.readthedocs.io/en/latest/index.html) instead - Add Kedro environment name to the pipeline name during upload @@ -131,7 +133,9 @@ - Method to schedule runs for most recent version of given pipeline `kedro kubeflow schedule` - Shortcut to open UI for pipelines using `kedro kubeflow ui` -[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.6.2...HEAD +[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.6.3...HEAD + +[0.6.3]: https://github.com/getindata/kedro-kubeflow/compare/0.6.2...0.6.3 [0.6.2]: https://github.com/getindata/kedro-kubeflow/compare/0.6.1...0.6.2 diff --git a/kedro_kubeflow/__init__.py b/kedro_kubeflow/__init__.py index 24cba0f..c298fab 100644 --- a/kedro_kubeflow/__init__.py +++ b/kedro_kubeflow/__init__.py @@ -1,3 +1,3 @@ """kedro_kubeflow.""" -version = "0.6.2" +version = "0.6.3" diff --git a/setup.cfg b/setup.cfg index afa7bd2..01a4825 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.6.2 +current_version = 0.6.3 [bumpversion:file:setup.py] diff --git a/setup.py b/setup.py index 48d50c4..e25e973 100644 --- a/setup.py +++ b/setup.py @@ -28,13 +28,13 @@ "docs": [ "sphinx==3.4.2", "recommonmark==0.7.1", - "sphinx_rtd_theme==0.6.2", + "sphinx_rtd_theme==0.6.3", ], } setup( name="kedro-kubeflow", - version="0.6.2", + version="0.6.3", description="Kedro plugin with Kubeflow support", long_description=README, long_description_content_type="text/markdown", From e5a7e98ba53095bb10f199f3a537ce098091c1c3 Mon Sep 17 00:00:00 2001 From: Mariusz Strzelecki Date: Tue, 10 May 2022 13:00:46 +0200 Subject: [PATCH 7/7] Trigger build