generated from hybrid-cloud-patterns/example
-
Notifications
You must be signed in to change notification settings - Fork 50
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
805 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
__version__ = "0.1.0" | ||
__loggername__ = "css_logger" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
import os | ||
|
||
import pytest | ||
from kubernetes import config | ||
from kubernetes.client import Configuration | ||
from openshift.dynamic import DynamicClient | ||
|
||
from . import __loggername__ | ||
from .css_logger import CSS_Logger | ||
|
||
|
||
def pytest_addoption(parser): | ||
parser.addoption( | ||
"--kubeconfig", | ||
action="store", | ||
default=None, | ||
help="The full path to the kubeconfig file to be used", | ||
) | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def get_kubeconfig(request): | ||
if request.config.getoption("--kubeconfig"): | ||
k8config = request.config.getoption("--kubeconfig") | ||
elif "KUBECONFIG" in os.environ.keys() and os.environ["KUBECONFIG"]: | ||
k8config = os.environ["KUBECONFIG"] | ||
else: | ||
raise ValueError( | ||
"A kubeconfig file was not provided. Please provide one either " | ||
"via the --kubeconfig command option or by setting a KUBECONFIG " | ||
"environment variable" | ||
) | ||
return k8config | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def kube_config(get_kubeconfig): | ||
kc = Configuration | ||
config.load_kube_config(config_file=get_kubeconfig, client_configuration=kc) | ||
return kc | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def openshift_dyn_client(get_kubeconfig): | ||
return DynamicClient(client=config.new_client_from_config(get_kubeconfig)) | ||
|
||
|
||
@pytest.fixture(scope="session", autouse=True) | ||
def setup_logger(): | ||
logger = CSS_Logger(__loggername__) | ||
return logger |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
from ocp_resources.resource import NamespacedResource, Resource | ||
|
||
|
||
class DeploymentConfig(NamespacedResource): | ||
""" | ||
OpenShift DeploymentConfig object. | ||
""" | ||
|
||
api_version = "apps.openshift.io/v1" | ||
kind = "DeploymentConfig" | ||
|
||
|
||
class ArgoCD(NamespacedResource): | ||
""" | ||
OpenShift ArgoCD / GitOps object. | ||
""" | ||
|
||
api_group = "argoproj.io" | ||
api_version = NamespacedResource.ApiVersion.V1ALPHA1 | ||
kind = "Application" | ||
|
||
@property | ||
def health(self): | ||
""" | ||
Check the health of of the argocd application | ||
:return: boolean | ||
""" | ||
|
||
if ( | ||
self.instance.status.operationState.phase == "Succeeded" | ||
and self.instance.status.health.status == "Healthy" | ||
): | ||
return True | ||
return False | ||
|
||
|
||
class ManagedCluster(Resource): | ||
""" | ||
OpenShift Managed Cluster object. | ||
""" | ||
|
||
api_version = "cluster.open-cluster-management.io/v1" | ||
|
||
@property | ||
def self_registered(self): | ||
""" | ||
Check if managed cluster is self registered in to ACM running on hub site | ||
:param name: (str) name of managed cluster | ||
:param namespace: namespace | ||
:return: Tuple of boolean and dict on success | ||
""" | ||
is_joined = False | ||
status = dict() | ||
|
||
for condition in self.instance.status.conditions: | ||
if condition["type"] == "HubAcceptedManagedCluster": | ||
status["HubAcceptedManagedCluster"] = condition["status"] | ||
elif condition["type"] == "ManagedClusterConditionAvailable": | ||
status["ManagedClusterConditionAvailable"] = condition["status"] | ||
elif condition["type"] == "ManagedClusterJoined": | ||
is_joined = True | ||
status["ManagedClusterJoined"] = condition["status"] | ||
|
||
return is_joined, status |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import logging | ||
import os | ||
from datetime import datetime | ||
from logging.handlers import RotatingFileHandler | ||
|
||
LOG_DIR = os.path.join(os.environ["WORKSPACE"], ".teflo/.results/test_execution_logs") | ||
if not os.path.exists(LOG_DIR): | ||
os.mkdir(LOG_DIR) | ||
|
||
|
||
class CSS_Logger(object): | ||
_logger = None | ||
|
||
def __new__(cls, *args, **kwargs): | ||
if cls._logger is None: | ||
cls._logger = super(CSS_Logger, cls).__new__(cls) | ||
# Put any initialization here. | ||
cls._logger = logging.getLogger(args[0]) | ||
cls._logger.setLevel(logging.DEBUG) | ||
|
||
pytest_current_test = os.environ.get("PYTEST_CURRENT_TEST") | ||
split_test_name = pytest_current_test.split("::")[1] | ||
short_test_name = split_test_name.split(" ")[0] | ||
|
||
datestring = datetime.now().strftime("%Y_%m_%d_%H_%M_%S") | ||
filename = "{}_{}.log".format(short_test_name, datestring) | ||
filepath = os.path.join(LOG_DIR, filename) | ||
|
||
# Create a file handler for logging level above DEBUG | ||
file_handler = RotatingFileHandler( | ||
filepath, maxBytes=1024 * 1024 * 1024, backupCount=20 | ||
) | ||
|
||
# Create a logging format | ||
log_formatter = logging.Formatter( | ||
"%(asctime)s " | ||
"[%(levelname)s] " | ||
"%(module)s:%(lineno)d " | ||
"%(message)s" | ||
) | ||
file_handler.setFormatter(log_formatter) | ||
|
||
# Create a stream handler for logging level above INFO | ||
stream_handler = logging.StreamHandler() | ||
stream_handler.setLevel(logging.INFO) | ||
stream_handler.setFormatter(log_formatter) | ||
|
||
# Add the handlers to the logger | ||
cls._logger.addHandler(file_handler) | ||
cls._logger.addHandler(stream_handler) | ||
|
||
return cls._logger |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
import base64 | ||
import fileinput | ||
import logging | ||
import os | ||
import subprocess | ||
|
||
import requests | ||
import yaml | ||
from ocp_resources.secret import Secret | ||
from requests import HTTPError, RequestException | ||
from urllib3.exceptions import InsecureRequestWarning, ProtocolError | ||
|
||
from . import __loggername__ | ||
|
||
logger = logging.getLogger(__loggername__) | ||
|
||
|
||
def load_yaml_file(file_path): | ||
""" | ||
Load and parse the yaml file | ||
:param file_path: (str) file path | ||
:return: (dict) yaml_config_obj in the form of Python dict | ||
""" | ||
yaml_config_obj = None | ||
with open(file_path, "r") as yfh: | ||
try: | ||
yaml_config_obj = yaml.load(yfh, Loader=yaml.FullLoader) | ||
except Exception as ex: | ||
raise yaml.YAMLError("YAML Syntax Error:\n %s" % ex) | ||
logger.info("Yaml Config : %s", yaml_config_obj) | ||
return yaml_config_obj | ||
|
||
|
||
def find_number_of_edge_sites(dir_path): | ||
""" | ||
Find the number of edge (managed cluster) sites folder | ||
:param dir_path: (dtr) dir path where edge site manifest resides | ||
:return: (list) site_names | ||
""" | ||
site_names = list() | ||
list_of_dirs = os.listdir(path=dir_path) | ||
|
||
for site_dir in list_of_dirs: | ||
if "staging" in site_dir: | ||
site_names.append(site_dir) | ||
|
||
return site_names | ||
|
||
|
||
def get_long_live_bearer_token( | ||
dyn_client, namespace="default", sub_string="default-token" | ||
): | ||
""" | ||
Get bearer token from secrets to authorize openshift cluster | ||
:param sub_string: (str) substring of secrets name to find actual secret name since openshift append random | ||
5 ascii digit at the end of every secret name | ||
:param namespace: (string) name of namespace where secret exist | ||
:return: (string) secret token for specified secret | ||
""" | ||
filtered_secrets = [] | ||
try: | ||
for secret in Secret.get(dyn_client=dyn_client, namespace=namespace): | ||
if sub_string in secret.instance.metadata.name: | ||
filtered_secrets.append(secret.instance.data.token) | ||
except StopIteration: | ||
logger.exception( | ||
"Specified substring %s doesn't exist in namespace %s", | ||
sub_string, | ||
namespace, | ||
) | ||
except ProtocolError as e: | ||
# See https://github.com/kubernetes-client/python/issues/1225 | ||
logger.info( | ||
"Skip %s... because kubelet disconnect client after default" " 10m...", | ||
e, | ||
) | ||
|
||
# All secret tokens in openshift are base64 encoded. | ||
# Decode base64 string into byte and convert byte to str | ||
if len(filtered_secrets) > 0: | ||
bearer_token = base64.b64decode(filtered_secrets[-1]).decode() | ||
return bearer_token | ||
else: | ||
return None | ||
|
||
|
||
def get_site_response(site_url, bearer_token): | ||
""" | ||
:param site_url: (str) Site API end point | ||
:param bearer_token: (str) bearer token | ||
:return: (dict) site_response | ||
""" | ||
site_response = None | ||
headers = {"Authorization": "Bearer " + bearer_token} | ||
|
||
try: | ||
# Suppress only the single warning from urllib3 needed. | ||
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) | ||
site_response = requests.get(site_url, headers=headers, verify=False) | ||
except (ConnectionError, HTTPError, RequestException) as e: | ||
logger.exception( | ||
"Failed to connect %s due to refused connection or unsuccessful" | ||
" status code %s", | ||
site_url, | ||
e, | ||
) | ||
logger.debug("Site Response %s: ", site_response) | ||
|
||
return site_response | ||
|
||
|
||
def execute_shell_command_local(cmd): | ||
""" | ||
Executes a shell command in a subprocess, wait until it has completed. | ||
:param cmd: Command to execute. | ||
""" | ||
proc = subprocess.Popen( | ||
cmd, | ||
shell=True, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||
universal_newlines=True, | ||
) | ||
(out, error) = proc.communicate() | ||
exit_code = proc.wait() | ||
return exit_code, out, error | ||
|
||
|
||
def modify_file_content(file_name): | ||
with open(file_name, "r") as frb: | ||
logger.debug(f"Current content : {frb.readlines()}") | ||
|
||
with fileinput.FileInput(file_name, inplace=True, backup=".bak") as file: | ||
for line in file: | ||
print( | ||
line.replace( | ||
'SENSOR_TEMPERATURE_ENABLED: "false"', | ||
'SENSOR_TEMPERATURE_ENABLED: "true"', | ||
), | ||
end="", | ||
) | ||
|
||
with open(file_name, "r") as fra: | ||
contents = fra.readlines() | ||
logger.debug(f"Modified content : {contents}") | ||
|
||
return contents |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
import logging | ||
import time | ||
|
||
import pytest | ||
|
||
from . import __loggername__ | ||
from .crd import DeploymentConfig | ||
|
||
logger = logging.getLogger(__loggername__) | ||
|
||
|
||
@pytest.mark.scale_image_generator | ||
def test_scale_image_generator(openshift_dyn_client): | ||
name = "image-generator" | ||
replicas = 1 | ||
body = {"spec": {"replicas": replicas}, "metadata": {"name": name}} | ||
|
||
logger.info("Check current replicas for image-generator deploymentconfig") | ||
dc = DeploymentConfig.get( | ||
dyn_client=openshift_dyn_client, namespace="xraylab-1", name=name | ||
) | ||
dc = next(dc) | ||
|
||
if int(dc.instance.status.replicas) != 0: | ||
err_msg = "Expected 0 replicas for image-generator deploymentconfig" | ||
logger.error(f"FAIL: {err_msg}") | ||
assert False, err_msg | ||
else: | ||
logger.info(f"Replicas found: {dc.instance.status.replicas}") | ||
|
||
logger.info("Scale image-generator deploymentconfig") | ||
dc.update(resource_dict=body) | ||
|
||
logger.info("Wait for image-generator pod") | ||
timeout = time.time() + 120 | ||
while time.time() < timeout: | ||
time.sleep(5) | ||
dc = DeploymentConfig.get( | ||
dyn_client=openshift_dyn_client, namespace="xraylab-1", name=name | ||
) | ||
dc = next(dc) | ||
if int(dc.instance.status.replicas) == replicas: | ||
break | ||
|
||
if int(dc.instance.status.replicas) != replicas: | ||
err_msg = f"Expected {replicas} replicas for {name} deploymentconfig" | ||
logger.error(f"FAIL: {err_msg}") | ||
assert False, err_msg | ||
else: | ||
logger.info("PASS: Scale image-generator test passed") |
Oops, something went wrong.