Skip to content

Commit

Permalink
Add option for gator hub (#117)
Browse files Browse the repository at this point in the history
Updates bw for gator changes, and adds option to point to a running hub
  • Loading branch information
Kotarski authored Dec 9, 2024
1 parent f458f77 commit be7d6e8
Show file tree
Hide file tree
Showing 5 changed files with 1,222 additions and 1,038 deletions.
1 change: 1 addition & 0 deletions blockwork/config/blockwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Blockwork:
host_scratch: str = "../{project}.scratch"
host_state: str = "../{project}.state"
host_tools: str = "../{project}.tools"
hub_url: str | None = None
config: list[str] = field(default_factory=list)
bootstrap: list[str] = field(default_factory=list)
tooldefs: list[str] = field(default_factory=list)
Expand Down
5 changes: 5 additions & 0 deletions blockwork/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import hashlib
import importlib
import logging
import os
import platform
import sys
from datetime import datetime
Expand Down Expand Up @@ -248,6 +249,10 @@ def caching_forced(self):
"""
return self.__force_cache

@property
def hub_url(self):
return os.environ.get("BW_HUB_URL", self.config.hub_url)

@property
@functools.lru_cache # noqa: B019
def state(self) -> State:
Expand Down
68 changes: 39 additions & 29 deletions blockwork/workflows/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,33 @@ def parse(cls, typ, path: Path):

def __call__(self, fn: Callable[..., Config]) -> Callable[..., None]:
@click.command()
@click.option(
"--parallel",
"-P",
is_flag=True,
default=False,
help=(
"Use Gator to run workflows in parallel, with the maximum number"
"of jobs set by the concurrency parameter"
),
)
@click.option(
"--concurrency",
"-c",
type=int,
default=max(1, multiprocessing.cpu_count() // 2),
help="Specify the maximum number of jobs allowed to run in parallel",
)
@click.option(
"--hub",
type=str,
default=None,
help="The gator hub url",
)
@click.pass_obj
def command(ctx, project=None, target=None, parallel=False, concurrency=1, *args, **kwargs):
def command(
ctx, project=None, target=None, parallel=False, concurrency=1, hub=None, *args, **kwargs
):
site_api = ConfigApi(ctx).with_site(ctx.site, self.SITE_TYPE)

if project:
Expand All @@ -96,31 +121,14 @@ def command(ctx, project=None, target=None, parallel=False, concurrency=1, *args
with site_api:
inst = fn(ctx, *args, **kwargs)
self._run(
ctx, *self.get_transform_tree(inst), concurrency=concurrency, parallel=parallel
ctx,
*self.get_transform_tree(inst),
concurrency=concurrency,
parallel=parallel,
hub=hub,
)

option_fns = []
option_fns.append(
click.option(
"--parallel",
"-P",
is_flag=True,
default=False,
help=(
"Use Gator to run workflows in parallel, with the maximum number"
"of jobs set by the concurrency parameter"
),
)
)
option_fns.append(
click.option(
"--concurrency",
"-c",
type=int,
default=max(1, multiprocessing.cpu_count() // 2),
help="Specify the maximum number of jobs allowed to run in parallel",
)
)
if self.project_type:
option_fns.append(click.option("--project", "-p", type=str, required=True))
if self.target_type:
Expand Down Expand Up @@ -242,6 +250,7 @@ def _run(
dependent_map: dict[Transform, OSet[Transform]],
parallel: bool,
concurrency: int,
hub: str | None = None,
):
"""
Run the workflow from transform dependency data.
Expand Down Expand Up @@ -300,14 +309,14 @@ def _run(
+ (f" with concurrency of {concurrency}" if parallel else "")
)

root_group = JobGroup(id="blockwork", cwd=ctx.host_root.as_posix())
root_group = JobGroup(ident="blockwork", cwd=ctx.host_root.as_posix())
idx_group = itertools.count()
prev_group = None
while run_scheduler.incomplete:
# Create group and chain dependency
group = JobGroup(id=f"stage_{next(idx_group)}")
group = JobGroup(ident=f"stage_{next(idx_group)}")
if prev_group is not None:
group.on_pass.append(prev_group.id)
group.on_pass.append(prev_group.ident)
prev_group = group
root_group.jobs.append(group)

Expand All @@ -321,7 +330,7 @@ def _run(
logging.info(f"Skipped transform (due to cached dependents): {transform}")
elif parallel:
# Assemble a unique job ID
job_id = f"{group.id}_{idx_job}"
job_id = f"{group.ident}_{idx_job}"
# Serialise the transform
spec_file = spec_dirx / f"{job_id}.json"
logging.debug(
Expand All @@ -341,7 +350,7 @@ def _run(
if DebugScope.current.VERBOSE:
args.insert(0, "--verbose")
job = Job(
id=f"{group.id}_job_{idx_job}",
ident=f"{group.ident}_job_{idx_job}",
cwd=ctx.host_root.as_posix(),
command="bw",
args=args,
Expand Down Expand Up @@ -376,6 +385,7 @@ def _run(
tracking=track_dirx,
sched_opts={"concurrency": concurrency},
glyph="🧱 Blockwork",
hub=hub or ctx.hub_url,
# TODO @intuity: In the long term a waiver system should be
# introduced to suppress errors if they are
# false, for now just set to a high value
Expand All @@ -389,7 +399,7 @@ def _run(
# Resolve the job
for idx, part in enumerate(job_id[1:]):
for sub in ptr.jobs:
if sub.id == part:
if sub.ident == part:
ptr = sub
break
else:
Expand Down
Loading

0 comments on commit be7d6e8

Please sign in to comment.