Skip to content

Commit

Permalink
Do preflight check in learn phase (#321)
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Gu <[email protected]>
tylergu authored Feb 16, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent fb8af27 commit 5a8c915
Showing 4 changed files with 65 additions and 36 deletions.
12 changes: 4 additions & 8 deletions acto/__main__.py
Original file line number Diff line number Diff line change
@@ -117,10 +117,6 @@
logger.info("Acto started with [%s]", sys.argv)
logger.info("Operator config: %s", config)

# Preload frequently used images to amid ImagePullBackOff
if args.preload_images:
logger.info("%s will be preloaded into Kind cluster", args.preload_images)

if args.context is None:
context_cache = os.path.join(
os.path.dirname(config.seed_custom_resource), "context.json"
@@ -134,18 +130,18 @@
acto = Acto(
workdir_path=args.workdir_path,
operator_config=config,
cluster_runtime=args.cluster_runtime,
preload_images_=args.preload_images,
cluster_runtime="KIND",
preload_images_=None,
context_file=context_cache,
helper_crd=args.helper_crd,
helper_crd=None,
num_workers=args.num_workers,
num_cases=args.num_cases,
dryrun=args.dryrun,
analysis_only=args.learn_analysis_only,
is_reproduce=False,
input_model=DeterministicInputModel,
apply_testcase_f=apply_testcase_f,
delta_from=args.delta_from,
delta_from=None,
focus_fields=config.focus_fields,
)
generation_time = datetime.now()
43 changes: 29 additions & 14 deletions acto/engine.py
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
import yaml

from acto.checker.checker_set import CheckerSet
from acto.checker.impl.health import HealthChecker
from acto.common import kubernetes_client, print_event
from acto.constant import CONST
from acto.deploy import Deploy
@@ -268,11 +269,11 @@ def __init__(
self.is_reproduce = is_reproduce

self.snapshots: list[Snapshot] = []
self.discarded_testcases: dict[
str, list[TestCase]
] = {} # List of test cases failed to run
self.apply_testcase_f = apply_testcase_f

# List of test cases failed to run
self.discarded_testcases: dict[str, list[TestCase]] = {}

self.apply_testcase_f = apply_testcase_f
self.curr_trial = 0

def run(
@@ -653,6 +654,7 @@ def run_and_check(
retry = 0
while True:
snapshot, _ = runner.run(input_cr, generation)
snapshot.dump(runner.trial_dir)
cli_result = check_kubectl_cli(snapshot)
if cli_result == CliStatus.CONNECTION_REFUSED:
# Connection refused due to webhook not ready, let's wait for a bit
@@ -696,6 +698,7 @@ def run_recovery(
logger.debug("Running recovery")
recovery_input = self.snapshots[RECOVERY_SNAPSHOT].input_cr
snapshot, _ = runner.run(recovery_input, generation=-1)
snapshot.dump(runner.trial_dir)
result = check_state_equality(
snapshot,
self.snapshots[RECOVERY_SNAPSHOT],
@@ -950,26 +953,38 @@ def __learn(self, context_file, helper_crd, analysis_only=False):
if deployed:
break
apiclient = kubernetes_client(learn_kubeconfig, learn_context_name)

self.context["crd"] = process_crd(
apiclient,
KubectlClient(learn_kubeconfig, learn_context_name),
self.crd_name,
helper_crd,
)

learn_dir = os.path.join(
self.workdir_path,
"trial-learn",
)
os.makedirs(learn_dir, exist_ok=True)
runner = Runner(
self.context,
"learn",
learn_dir,
learn_kubeconfig,
learn_context_name,
self.deploy.operator_container_name,
)
runner.run_without_collect(
self.operator_config.seed_custom_resource
)
snapshot, _ = runner.run(input_cr=self.seed, generation=0)
snapshot.dump(runner.trial_dir)
health_result = HealthChecker().check(0, snapshot, None)
if health_result is not None:
raise RuntimeError(
f"Health check failed during learning phase: {health_result['message']}"
"Please make sure the operator config is correct"
)

update_preload_images(
self.context, self.cluster.get_node_list("learn")
)
self.context["crd"] = process_crd(
apiclient,
KubectlClient(learn_kubeconfig, learn_context_name),
self.crd_name,
helper_crd,
)
self.cluster.delete_cluster("learn", learn_kubeconfig)

run_end_time = time.time()
44 changes: 31 additions & 13 deletions acto/post_process/post_diff_test.py
Original file line number Diff line number Diff line change
@@ -78,6 +78,7 @@ def compare_system_equality(
prev_system_state: dict,
additional_exclude_paths: Optional[list[str]] = None,
) -> Optional[dict]:
"""Compare two system states and return the diff if they are not equal, otherwise return None"""
logger = get_thread_logger(with_prefix=False)
curr_system_state = deepcopy(curr_system_state)
prev_system_state = deepcopy(prev_system_state)
@@ -116,20 +117,20 @@ def compare_system_equality(
new_pods[k] = v
prev_system_state["pod"] = new_pods

for name, obj in prev_system_state["secret"].items():
for _, obj in prev_system_state["secret"].items():
if "data" in obj and obj["data"] is not None:
for key, data in obj["data"].items():
try:
obj["data"][key] = json.loads(data)
except:
except json.JSONDecodeError:
pass

for name, obj in curr_system_state["secret"].items():
for _, obj in curr_system_state["secret"].items():
if "data" in obj and obj["data"] is not None:
for key, data in obj["data"].items():
try:
obj["data"][key] = json.loads(data)
except:
except json.JSONDecodeError:
pass

if len(curr_system_state["secret"]) != len(prev_system_state["secret"]):
@@ -217,6 +218,7 @@ def compare_system_equality(


def postprocess_deepdiff(diff: TreeResult):
"""Postprocess the deepdiff result to ignore non-deterministic fields"""
# ignore PVC add/removal, because PVC can be intentially left behind
logger = get_thread_logger(with_prefix=False)
if "dictionary_item_removed" in diff:
@@ -245,6 +247,7 @@ def postprocess_deepdiff(diff: TreeResult):


def compare_func(x, y, _: DiffLevel = None):
"""Compare function for deepdiff taking key and operator into account"""
try:
if "name" not in x or "name" not in y:
return x["key"] == y["key"] and x["operator"] == y["operator"]
@@ -259,6 +262,8 @@ def compare_func(x, y, _: DiffLevel = None):


class NameOperator(BaseOperator):
"""Operator to compare the object taking into consideration of the name field"""

def give_up_diffing(self, level, diff_instance):
_ = diff_instance
x_name = level.t1
@@ -273,6 +278,8 @@ def give_up_diffing(self, level, diff_instance):


class TypeChangeOperator(BaseOperator):
"""Operator to compare the object taking into consideration of the type change"""

def give_up_diffing(self, level, diff_instance):
if level.t1 is None:
if isinstance(level.t2, dict):
@@ -289,6 +296,7 @@ def give_up_diffing(self, level, diff_instance):


def get_nondeterministic_fields(s1, s2, additional_exclude_paths):
"""Get the nondeterministic fields between two system states"""
nondeterministic_fields = []
result = compare_system_equality(
s1, s2, additional_exclude_paths=additional_exclude_paths
@@ -320,6 +328,8 @@ def get_nondeterministic_fields(s1, s2, additional_exclude_paths):


class AdditionalRunner:
"""Additional Runner"""

def __init__(
self,
context: Dict,
@@ -345,6 +355,7 @@ def __init__(
self._images_archive = os.path.join(workdir, "images.tar")

def run_cr(self, cr, trial, gen):
"""Run a CR and return the snapshot"""
self._cluster.restart_cluster(self._cluster_name, self._kubeconfig)
self._cluster.load_images(self._images_archive, self._cluster_name)
apiclient = kubernetes_client(self._kubeconfig, self._context_name)
@@ -366,6 +377,7 @@ def run_cr(self, cr, trial, gen):
operator_container_name=self._deploy.operator_container_name,
)
snapshot, _ = runner.run(cr, generation=self._generation)
snapshot.dump(runner.trial_dir)
difftest_result = {
"input_digest": hashlib.md5(
json.dumps(cr, sort_keys=True).encode("utf-8")
@@ -386,6 +398,8 @@ def run_cr(self, cr, trial, gen):


class DeployRunner:
"""Deploy runner for Acto"""

def __init__(
self,
workqueue: multiprocessing.Queue,
@@ -412,6 +426,7 @@ def __init__(
self._images_archive = os.path.join(workdir, "images.tar")

def run(self):
"""Run the deploy runner"""
logger = get_thread_logger(with_prefix=True)
generation = 0
trial_dir = os.path.join(self._workdir, f"trial-{self._worker_id:02d}")
@@ -450,6 +465,7 @@ def run(self):
cr = group.iloc[0]["input"]

snapshot, err = runner.run(cr, generation=generation)
snapshot.dump(runner.trial_dir)
after_run_time = time.time()
err = True
difftest_result = DiffTestResult(
@@ -479,14 +495,11 @@ def run(self):
self._cluster.load_images(
self._images_archive, self._cluster_name
)
apiclient = kubernetes_client(
self._kubeconfig, self._context_name
)
kubectl_client = KubectlClient(
self._kubeconfig, self._context_name
)
after_k8s_bootstrap_time = time.time()
deployed = self._deploy.deploy_with_retry(
_ = self._deploy.deploy_with_retry(
self._kubeconfig,
self._context_name,
kubectl_client=kubectl_client,
@@ -505,6 +518,8 @@ def run(self):


class PostDiffTest(PostProcessor):
"""Post diff test class for Acto"""

def __init__(
self,
testrun_dir: str,
@@ -564,6 +579,7 @@ def __init__(
print(series.head())

def post_process(self, workdir: str, num_workers: int = 1):
"""Start the post process"""
if not os.path.exists(workdir):
os.mkdir(workdir)
cluster = kind.Kind(
@@ -612,6 +628,7 @@ def post_process(self, workdir: str, num_workers: int = 1):
p.join()

def check(self, workdir: str, num_workers: int = 1):
"""Check the post process result"""
logger = get_thread_logger(with_prefix=True)
logger.info(
"Additional exclude paths: %s", self.config.diff_ignore_fields
@@ -639,6 +656,7 @@ def check(self, workdir: str, num_workers: int = 1):
def check_diff_test_result(
self, workqueue: multiprocessing.Queue, workdir: str, worker_id: int
):
"""Check the diff test result"""
additional_runner_dir = os.path.join(
workdir, f"additional-runner-{worker_id}"
)
@@ -690,8 +708,7 @@ def check_diff_test_result(
)
if step_result is None:
continue
else:
group_errs.append(step_result)
group_errs.append(step_result)
if len(group_errs) > 0:
with open(
os.path.join(
@@ -703,8 +720,6 @@ def check_diff_test_result(
) as result_f:
json.dump(group_errs, result_f, cls=ActoEncoder, indent=4)

return None

@staticmethod
def check_diff_test_step(
diff_test_result: DiffTestResult,
@@ -713,6 +728,8 @@ def check_diff_test_step(
run_check_indeterministic: bool = False,
additional_runner: Optional[AdditionalRunner] = None,
) -> Optional[DifferentialOracleResult]:
"""Check the diff test step result and return the differential oracle "
"result if it fails, otherwise return None"""
logger = get_thread_logger(with_prefix=True)
trial_dir = original_result.run_result.step_id.trial
gen = original_result.run_result.step_id.generation
@@ -764,7 +781,8 @@ def check_diff_test_step(
gen,
)
return DifferentialOracleResult(
message="failed attempt recovering to seed state - system state diff",
message="failed attempt recovering to seed state "
"- system state diff",
diff=result,
from_step=StepID(
trial=trial_dir, generation=gen
2 changes: 1 addition & 1 deletion acto/runner/runner.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Runner module for Acto"""

import base64
import multiprocessing
import queue
@@ -180,7 +181,6 @@ def run(
not_ready_pods_logs=unready_pod_logs,
generation=generation,
)
snapshot.dump(self.trial_dir)
return snapshot, err

def run_without_collect(self, seed_file: str):

0 comments on commit 5a8c915

Please sign in to comment.