Skip to content

Commit

Permalink
Merge pull request #128 from getindata/release-0.6.4
Browse files Browse the repository at this point in the history
Release 0.6.4
  • Loading branch information
em-pe authored Jun 1, 2022
2 parents 5e57a04 + 7b6bf3d commit ebb9452
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 5 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

## [0.6.4] - 2022-06-01

- Added support for specifying tolerations

## [0.6.3] - 2022-05-10

- KFP SDK version bumped to 1.8.11 in order to fix misbehaving TTL issue
Expand Down Expand Up @@ -133,7 +137,9 @@
- Method to schedule runs for most recent version of given pipeline `kedro kubeflow schedule`
- Shortcut to open UI for pipelines using `kedro kubeflow ui`

[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.6.3...HEAD
[Unreleased]: https://github.com/getindata/kedro-kubeflow/compare/0.6.4...HEAD

[0.6.4]: https://github.com/getindata/kedro-kubeflow/compare/0.6.3...0.6.4

[0.6.3]: https://github.com/getindata/kedro-kubeflow/compare/0.6.2...0.6.3

Expand Down
2 changes: 1 addition & 1 deletion docs/source/01_introduction/01_intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ like pod or volume) and edges (dependencies between the nodes, like passing outp
data as input). The pipelines are stored in the versioned database, allowing user
to run the pipeline once or schedule the recurring run.

## Why to integrate Kedro project with Pipelines?
## Why integrate a Kedro project with Pipelines?

Kubeflow Pipelines' main attitude is the portability. Once you define a pipeline,
it can be started on any Kubernetes cluster. The code to execute is stored inside
Expand Down
13 changes: 13 additions & 0 deletions docs/source/02_installation/02_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ run_config:
# Flak indicating if volume for inter-node data exchange should be
# kept after the pipeline is deleted
keep: False

# Optional section allowing adjustment of the tolerations for the nodes
tolerations:
__default__:
- key: "dedicated"
operator: "Equal"
value: "ml-ops"
effect: "NoSchedule"
node_a:
- key: "dedicated"
operator: "Equal"
value: "gpu_workload"
effect: "NoSchedule"

# Optional section allowing adjustment of the resources
# reservations and limits for the nodes
Expand Down
2 changes: 1 addition & 1 deletion kedro_kubeflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""kedro_kubeflow."""

version = "0.6.3"
version = "0.6.4"
30 changes: 30 additions & 0 deletions kedro_kubeflow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,21 @@
num_retries: 4
backoff_duration: 60s
backoff_factor: 2
# Optional section allowing adjustment of the resources
# reservations and limits for the nodes
# optional section for specifying tolerations per node.
# the __default__ section will be loaded if nothing is specified for a particular node.
tolerations:
__default__:
- key: "dedicated"
operator: "Equal"
value: "ml-ops"
effect: "NoSchedule"
node_a:
- key: "gpu_resource"
operator: "Equal"
value: "voltaire"
effect: "NoSchedule"
"""


Expand Down Expand Up @@ -192,6 +207,17 @@ def get_for(self, node_name):
return {**defaults, **node_specific}


class Tolerations(Config):
def is_set_for(self, node_name):
return bool(self.get_for(node_name))

def get_for(self, node_name):
node_values = self._get_or_default(node_name, [])
if node_values:
return node_values
return self._get_or_default("__default__", [])


class RetryPolicy(Config):
def is_set_for(self, node_name):
return self.get_for(node_name) != {}
Expand Down Expand Up @@ -251,6 +277,10 @@ def description(self):
def resources(self):
return NodeResources(self._get_or_default("resources", {}))

@property
def tolerations(self):
return Tolerations(self._get_or_default("tolerations", {}))

@property
def retry_policy(self):
return RetryPolicy(self._get_or_default("retry_policy", {}))
Expand Down
3 changes: 3 additions & 0 deletions kedro_kubeflow/generators/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,7 @@ def customize_op(op, image_pull_policy, run_config):
op.set_retry(
policy="Always", **run_config.retry_policy.get_for(op.name)
)
if run_config.tolerations.is_set_for(op.name):
for toleration in run_config.tolerations.get_for(op.name):
op.add_toleration(k8s.V1Toleration(**toleration))
return op
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.6.3
current_version = 0.6.4

[bumpversion:file:setup.py]

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

setup(
name="kedro-kubeflow",
version="0.6.3",
version="0.6.4",
description="Kedro plugin with Kubeflow support",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
68 changes: 68 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,74 @@ def test_resources_default_and_node_specific(self):
"memory": "64Mi",
}

def test_tolerations_default_only(self):
toleration_config = [
{
"key": "thekey",
"operator": "equal",
"value": "thevalue",
"effect": "NoSchedule",
}
]
cfg = PluginConfig(
{"run_config": {"tolerations": {"__default__": toleration_config}}}
)
assert cfg.run_config.tolerations.is_set_for("node2")
assert cfg.run_config.tolerations.get_for("node2") == toleration_config
assert cfg.run_config.tolerations.is_set_for("node3")
assert cfg.run_config.tolerations.get_for("node3") == toleration_config

def test_tolerations_no_default(self):
toleration_config = [
{
"key": "thekey",
"operator": "equal",
"value": "thevalue",
"effect": "NoSchedule",
}
]
cfg = PluginConfig(
{"run_config": {"tolerations": {"node2": toleration_config}}}
)
assert cfg.run_config.tolerations.is_set_for("node2")
assert cfg.run_config.tolerations.get_for("node2") == toleration_config
assert cfg.run_config.tolerations.is_set_for("node3") is False

def test_tolerations_default_and_node_specific(self):
toleration_config = [
{
"key": "thekey",
"operator": "equal",
"value": "thevalue",
"effect": "NoSchedule",
}
]
default_toleration_config = [
{
"key": "thekeyfordefault",
"operator": "equal",
"value": "thevaluefordefault",
"effect": "NoSchedule",
}
]
cfg = PluginConfig(
{
"run_config": {
"tolerations": {
"__default__": default_toleration_config,
"node2": toleration_config,
}
}
}
)
assert cfg.run_config.tolerations.is_set_for("node2")
assert cfg.run_config.tolerations.get_for("node2") == toleration_config
assert cfg.run_config.tolerations.is_set_for("node3")
assert (
cfg.run_config.tolerations.get_for("node3")
== default_toleration_config
)

def test_do_not_keep_volume_by_default(self):
cfg = PluginConfig({"run_config": {"volume": {}}})
assert cfg.run_config.volume.keep is False
Expand Down

0 comments on commit ebb9452

Please sign in to comment.