forked from microsoft/AIOpsLab
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbase.py
69 lines (55 loc) · 2.3 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""Interface for fault injector classes.
NOTE: Each layer of injection like ApplicationFaultInjector, OSFaultInjector HardwareFaultInjector, etc.
are implemented as child classes of FaultInjector.
"""
import time
class FaultInjector:
def __init__(self, testbed):
self.testbed = testbed
# TODO: There are faults that not happen on the service level,
# so it is not a must to use the microservices param.
def inject_fault(
self,
fault_type: str,
fault_id: str,
start_time: float,
end_time: float,
microservices: list[str] = None,
):
"""
Base class to inject a fault into the specified microservices.
Parameters:
microservices (list[str]): list of microservices to inject the fault into.
fault_type (str): Type of fault to inject.
fault_id (str): Unique identifier for the fault.
start_time (float): Time to start the fault injection (epoch time).
end_time (float): Time to end the fault injection (epoch time).
"""
current_time = time.time()
if current_time < start_time:
time.sleep(start_time - current_time)
self._inject(microservices, fault_type)
# (@manish: we should not recover fault before agent solves)
# self._recover(microservices, fault_type)
def _inject(self, fault_type: str, microservices: list[str], duration: str = None):
if duration:
self._invoke_method("inject", fault_type, microservices, duration)
else:
self._invoke_method("inject", fault_type, microservices)
time.sleep(6)
def _recover(self, fault_type: str, microservices: list[str] = None,):
if microservices and fault_type:
self._invoke_method("recover", fault_type, microservices)
elif fault_type:
self._invoke_method("recover", fault_type)
time.sleep(6)
def _invoke_method(self, action_prefix, *args):
"""helper: injects/recovers faults based on name"""
method_name = f"{action_prefix}_{args[0]}"
method = getattr(self, method_name, None)
if method:
method(*args[1:])
else:
print(f"Unknown fault type: {args[0]}")