Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1937801 - Implement mechanism to use caches for common tools in run transforms #623

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
10 changes: 7 additions & 3 deletions packages/pytest-taskgraph/src/pytest_taskgraph/fixtures/gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,11 @@ def inner(
if extra_params:
parameters.update(extra_params)
if extra_graph_config:
graph_config._config.update(extra_graph_config)
# We need this intermediate variable because `GraphConfig` is
# frozen and we can't set attributes on it.
new_graph_config = merge(graph_config._config, extra_graph_config)
graph_config._config.update(new_graph_config)

return TransformConfig(
"test",
str(here),
Expand All @@ -220,12 +224,12 @@ def inner(

@pytest.fixture
def run_transform(make_transform_config):
def inner(func, tasks, config=None):
def inner(func, tasks, config=None, **kwargs):
if not isinstance(tasks, list):
tasks = [tasks]

if not config:
config = make_transform_config()
config = make_transform_config(**kwargs)
return list(func(config, tasks))

return inner
Expand Down
15 changes: 13 additions & 2 deletions src/taskgraph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from voluptuous import All, Any, Extra, Length, Optional, Required

from .util import path
from .util.caches import CACHES
from .util.python_path import find_object
from .util.schema import Schema, optionally_keyed_by, validate_schema
from .util.yaml import load_yaml
Expand Down Expand Up @@ -74,6 +75,16 @@
"index-path-regexes",
description="Regular expressions matching index paths to be summarized.",
): [str],
Optional(
"run",
description="Configuration related to the 'run' transforms.",
): {
Optional(
"use-caches",
description="List of caches to enable, or a boolean to "
"enable/disable all of them.",
): Any(bool, list(CACHES.keys())),
},
Required("repositories"): All(
{
str: {
Expand Down Expand Up @@ -106,8 +117,8 @@ def __getitem__(self, name):
def __contains__(self, name):
return name in self._config

def get(self, name):
return self._config.get(name)
def get(self, name, default=None):
return self._config.get(name, default)

def register(self):
"""
Expand Down
11 changes: 4 additions & 7 deletions src/taskgraph/run-task/run-task
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ def _display_python_version():


def main(args):
os.environ["TASK_WORKDIR"] = os.getcwd()
task_workdir = os.environ["TASK_WORKDIR"] = os.getcwd()
print_line(
b"setup",
b"run-task started in %s\n" % os.environ["TASK_WORKDIR"].encode("utf-8"),
Expand Down Expand Up @@ -1316,12 +1316,9 @@ def main(args):
resource_process = None

try:
for k in ["MOZ_FETCHES_DIR", "UPLOAD_DIR"] + [
"{}_PATH".format(repository["project"].upper())
for repository in repositories
]:
if k in os.environ:
os.environ[k] = os.path.abspath(os.environ[k])
for k, v in os.environ.items():
if "$TASK_WORKDIR" in v:
os.environ[k] = v.replace("$TASK_WORKDIR", task_workdir)
print_line(
b"setup",
b"%s is %s\n" % (k.encode("utf-8"), os.environ[k].encode("utf-8")),
Expand Down
2 changes: 1 addition & 1 deletion src/taskgraph/transforms/run/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ def cmp_artifacts(a):
"task-reference": json.dumps(task_fetches, sort_keys=True)
}

env.setdefault("MOZ_FETCHES_DIR", "fetches")
env.setdefault("MOZ_FETCHES_DIR", "$TASK_WORKDIR/fetches")

yield task

Expand Down
119 changes: 80 additions & 39 deletions src/taskgraph/transforms/run/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
consistency.
"""

import hashlib
import json
from typing import Any, Dict, List, Union

from taskgraph.transforms.base import TransformConfig
from taskgraph.util import path
from taskgraph.util.caches import CACHES, get_checkout_dir
from taskgraph.util.taskcluster import get_artifact_prefix


Expand All @@ -31,10 +34,10 @@ def add_cache(task, taskdesc, name, mount_point, skip_untrusted=False):
skip_untrusted (bool): Whether cache is used in untrusted environments
(default: False). Only applies to docker-worker.
"""
if not task["run"].get("use-caches", True):
return

worker = task["worker"]
if worker["implementation"] not in ("docker-worker", "generic-worker"):
# caches support not implemented
return

if worker["implementation"] == "docker-worker":
taskdesc["worker"].setdefault("caches", []).append(
Expand All @@ -54,10 +57,6 @@ def add_cache(task, taskdesc, name, mount_point, skip_untrusted=False):
}
)

else:
# Caches not implemented
pass


def add_artifacts(config, task, taskdesc, path):
taskdesc["worker"].setdefault("artifacts", []).append(
Expand Down Expand Up @@ -91,54 +90,28 @@ def support_vcs_checkout(config, task, taskdesc, repo_configs, sparse=False):
reserved for ``run-task`` tasks.
"""
worker = task["worker"]
is_mac = worker["os"] == "macosx"
assert worker["os"] in ("linux", "macosx", "windows")
is_win = worker["os"] == "windows"
is_linux = worker["os"] == "linux"
is_docker = worker["implementation"] == "docker-worker"
assert is_mac or is_win or is_linux

checkoutdir = get_checkout_dir(task)
if is_win:
checkoutdir = "./build"
hgstore = "y:/hg-shared"
elif is_docker:
checkoutdir = "{workdir}/checkouts".format(**task["run"])
hgstore = f"{checkoutdir}/hg-store"
else:
checkoutdir = "./checkouts"
hgstore = f"{checkoutdir}/hg-shared"

vcsdir = checkoutdir + "/" + get_vcsdir_name(worker["os"])
cache_name = "checkouts"

# Robust checkout does not clean up subrepositories, so ensure that tasks
# that checkout different sets of paths have separate caches.
# See https://bugzilla.mozilla.org/show_bug.cgi?id=1631610
if len(repo_configs) > 1:
checkout_paths = {
"\t".join([repo_config.path, repo_config.prefix])
for repo_config in sorted(
repo_configs.values(), key=lambda repo_config: repo_config.path
)
}
checkout_paths_str = "\n".join(checkout_paths).encode("utf-8")
digest = hashlib.sha256(checkout_paths_str).hexdigest()
cache_name += f"-repos-{digest}"

# Sparse checkouts need their own cache because they can interfere
# with clients that aren't sparse aware.
if sparse:
cache_name += "-sparse"

add_cache(task, taskdesc, cache_name, checkoutdir)

vcsdir = f"{checkoutdir}/{get_vcsdir_name(worker['os'])}"
env = taskdesc["worker"].setdefault("env", {})
env.update(
{
"HG_STORE_PATH": hgstore,
"REPOSITORIES": json.dumps(
{repo.prefix: repo.name for repo in repo_configs.values()}
),
"VCS_PATH": vcsdir,
# If vcsdir is already absolute this will return it unmodified.
"VCS_PATH": path.join("$TASK_WORKDIR", vcsdir),
}
)
for repo_config in repo_configs.values():
Expand All @@ -162,3 +135,71 @@ def support_vcs_checkout(config, task, taskdesc, repo_configs, sparse=False):
# only some worker platforms have taskcluster-proxy enabled
if task["worker"]["implementation"] in ("docker-worker",):
taskdesc["worker"]["taskcluster-proxy"] = True

return vcsdir


def should_use_cache(
name: str,
use_caches: Union[bool, List[str]],
has_checkout: bool,
) -> bool:
# Never enable the checkout cache if there's no clone. This allows
# 'checkout' to be specified as a default cache without impacting
# irrelevant tasks.
if name == "checkout" and not has_checkout:
return False

if isinstance(use_caches, bool):
return use_caches

return name in use_caches


def support_caches(
config: TransformConfig, task: Dict[str, Any], taskdesc: Dict[str, Any]
):
"""Add caches for common tools."""
run = task["run"]
worker = task["worker"]
workdir = run.get("workdir")
if not workdir:
workdir = (
"/builds/worker" if worker["implementation"] == "docker-worker" else ""
)

base_cache_dir = path.join(workdir, ".task-cache")

use_caches = run.get("use-caches")
if use_caches is None:
# Use project default values for filtering caches, default to
# checkout cache if no selection is specified.
use_caches = (
config.graph_config.get("taskgraph", {})
.get("run", {})
.get("use-caches", ["checkout"])
)

for name, cache_cfg in CACHES.items():
if not should_use_cache(name, use_caches, run["checkout"]):
continue

if "cache_dir" in cache_cfg:
assert callable(cache_cfg["cache_dir"])
cache_dir = cache_cfg["cache_dir"](task)
else:
cache_dir = f"{base_cache_dir}/{name}"

if "cache_name" in cache_cfg:
assert callable(cache_cfg["cache_name"])
cache_name = cache_cfg["cache_name"](config, task)
else:
cache_name = name

if cache_cfg.get("env"):
env = taskdesc["worker"].setdefault("env", {})
# If cache_dir is already absolute, the `.join` call returns it as
# is. In that case, $TASK_WORKDIR will get interpolated by
# run-task.
env[cache_cfg["env"]] = path.join("$TASK_WORKDIR", cache_dir)
add_cache(task, taskdesc, cache_name, cache_dir)
39 changes: 12 additions & 27 deletions src/taskgraph/transforms/run/run_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@
from voluptuous import Any, Optional, Required

from taskgraph.transforms.run import run_task_using
from taskgraph.transforms.run.common import support_vcs_checkout
from taskgraph.transforms.run.common import (
support_caches,
support_vcs_checkout,
)
from taskgraph.transforms.task import taskref_or_string
from taskgraph.util import path, taskcluster
from taskgraph.util.caches import CACHES
from taskgraph.util.schema import Schema

EXEC_COMMANDS = {
Expand All @@ -24,12 +28,11 @@
run_task_schema = Schema(
{
Required("using"): "run-task",
# if true, add a cache at ~worker/.cache, which is where things like pip
# tend to hide their caches. This cache is never added for level-1 tasks.
# TODO Once bug 1526028 is fixed, this and 'use-caches' should be merged.
Required("cache-dotcache"): bool,
# Whether or not to use caches.
Optional("use-caches"): bool,
# Which caches to use. May take a boolean in which case either all
# (True) or no (False) caches will be used. Alternatively, it can
# accept a list of caches to enable. Defaults to only the checkout cache
# enabled.
Optional("use-caches", "caches"): Any(bool, list(CACHES.keys())),
# if true (the default), perform a checkout on the worker
Required("checkout"): Any(bool, {str: dict}),
Optional(
Expand Down Expand Up @@ -70,15 +73,14 @@ def common_setup(config, task, taskdesc, command):
for (repo, config) in run["checkout"].items()
}

support_vcs_checkout(
vcs_path = support_vcs_checkout(
config,
task,
taskdesc,
repo_configs=repo_configs,
sparse=bool(run["sparse-profile"]),
)

vcs_path = taskdesc["worker"]["env"]["VCS_PATH"]
for repo_config in repo_configs.values():
checkout_path = path.join(vcs_path, repo_config.path)
command.append(f"--{repo_config.prefix}-checkout={checkout_path}")
Expand All @@ -104,11 +106,11 @@ def common_setup(config, task, taskdesc, command):
if "cwd" in run:
command.extend(("--task-cwd", run["cwd"]))

support_caches(config, task, taskdesc)
taskdesc["worker"].setdefault("env", {})["MOZ_SCM_LEVEL"] = config.params["level"]


worker_defaults = {
"cache-dotcache": False,
"checkout": True,
"sparse-profile": None,
"run-as-root": False,
Expand All @@ -135,16 +137,6 @@ def docker_worker_run_task(config, task, taskdesc):
command = run.pop("run-task-command", ["/usr/local/bin/run-task"])
common_setup(config, task, taskdesc, command)

if run.get("cache-dotcache"):
worker["caches"].append(
{
"type": "persistent",
"name": "{project}-dotcache".format(**config.params),
"mount-point": "{workdir}/.cache".format(**run),
"skip-untrusted": True,
}
)

run_command = run["command"]

# dict is for the case of `{'task-reference': str}`.
Expand Down Expand Up @@ -177,13 +169,6 @@ def generic_worker_run_task(config, task, taskdesc):
common_setup(config, task, taskdesc, command)

worker.setdefault("mounts", [])
if run.get("cache-dotcache"):
worker["mounts"].append(
{
"cache-name": "{project}-dotcache".format(**config.params),
"directory": "{workdir}/.cache".format(**run),
}
)
worker["mounts"].append(
{
"content": {
Expand Down
Loading
Loading