Skip to content

Commit

Permalink
Add reporting hooks for transforms (#126)
Browse files Browse the repository at this point in the history
Allow transforms to define `tf_report` and `tf_cls_report` methods which
are run on the host after all transforms have run.

This provides an opportunity for transforms to output important
information to the user when running in parallel

Also pinned the runner version as the default changed:
actions/runner-images#10636
  • Loading branch information
Kotarski authored Jan 24, 2025
1 parent 6a39e1f commit 0d41819
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 2 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
uses: actions/deploy-pages@v2

unit_tests:
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
timeout-minutes: 15
strategy:
fail-fast: false
Expand Down
11 changes: 11 additions & 0 deletions blockwork/transforms/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -1160,5 +1160,16 @@ def execute(self, ctx: "Context", /) -> Generator["Invocation", Result, None]:
"""
raise NotImplementedError

def tf_report(self, ctx: "Context", /) -> None:
"""
Host hook to report on this transform after all transforms have run.
"""

@classmethod
def tf_cls_report(cls, ctx: "Context", transforms: list[Self], /) -> None:
"""
Host hook to report on all instances of this transform after all transforms have run.
"""

def __repr__(self) -> str:
return f"<{type(self).__name__} hash='{self._cached_input_hash}'>"
12 changes: 12 additions & 0 deletions blockwork/workflows/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ def _run(
args=args,
resources=[Cores(count=1), Memory(size=1, unit="GB")],
)
run_transforms.add(transform)
group.jobs.append(job)
scheduled.append(transform)
else:
Expand Down Expand Up @@ -422,6 +423,17 @@ def _run(
if is_caching:
Cache.prune_all(ctx)

# Run reporting stages
run_transform_instances = defaultdict(OSet)
for transform in run_transforms:
run_transform_instances[type(transform)].add(transform)
if (tf_report := getattr(transform, "tf_report", None)) is not None:
tf_report(ctx)

for transform_class, transforms in run_transform_instances.items():
if (tf_cls_report := getattr(transform_class, "tf_cls_report", None)) is not None:
tf_cls_report(ctx, list(transforms))

# This is primarily returned for unit-testing
return SimpleNamespace(
run=run_transforms,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ordered-set = "4.1.0"
filelock = "3.14.0"
pytz = "2024.1"
requests = "2.31.0"
gator-eda = { git = "https://github.com/Intuity/gator.git", rev = "67f1f4ebcd571154615630a447b2fc514b1cd0d4" }
gator-eda = { git = "https://github.com/Intuity/gator.git", rev = "9b6c47b6a70f65a4da55e0b17470da3279b563be" }
boto3 = "1.34.103"

[tool.poetry.group.dev.dependencies]
Expand Down

0 comments on commit 0d41819

Please sign in to comment.