Skip to content

Commit

Permalink
feat(anta.cli): Make CV certs verified by default (#700)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Carl Baillargeon <[email protected]>
  • Loading branch information
gmuloc and carl-baillargeon authored Jun 10, 2024
1 parent 59bef03 commit e5222d1
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 34 deletions.
22 changes: 18 additions & 4 deletions anta/cli/get/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import TYPE_CHECKING, Any

import click
import requests
from cvprac.cvp_client import CvpClient
from cvprac.cvp_client_errors import CvpApiError
from rich.pretty import pretty_repr
Expand All @@ -36,14 +37,27 @@
@click.option("--username", "-u", help="CloudVision username", type=str, required=True)
@click.option("--password", "-p", help="CloudVision password", type=str, required=True)
@click.option("--container", "-c", help="CloudVision container where devices are configured", type=str)
def from_cvp(ctx: click.Context, output: Path, host: str, username: str, password: str, container: str | None) -> None:
@click.option(
"--ignore-cert",
help="Ignore verifying the SSL certificate when connecting to CloudVision",
show_envvar=True,
is_flag=True,
default=False,
)
def from_cvp(ctx: click.Context, output: Path, host: str, username: str, password: str, container: str | None, *, ignore_cert: bool) -> None:
# pylint: disable=too-many-arguments
"""Build ANTA inventory from Cloudvision.
"""Build ANTA inventory from CloudVision.
TODO - handle get_inventory and get_devices_in_container failure
NOTE: Only username/password authentication is supported for on-premises CloudVision instances.
Token authentication for both on-premises and CloudVision as a Service (CVaaS) is not supported.
"""
# TODO: - Handle get_cv_token, get_inventory and get_devices_in_container failures.
logger.info("Getting authentication token for user '%s' from CloudVision instance '%s'", username, host)
token = get_cv_token(cvp_ip=host, cvp_username=username, cvp_password=password)
try:
token = get_cv_token(cvp_ip=host, cvp_username=username, cvp_password=password, verify_cert=not ignore_cert)
except requests.exceptions.SSLError as error:
logger.error("Authentication to CloudVison failed: %s.", error)
ctx.exit(ExitCode.USAGE_ERROR)

clnt = CvpClient()
try:
Expand Down
27 changes: 22 additions & 5 deletions anta/cli/get/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,41 @@ def wrapper(
return wrapper


def get_cv_token(cvp_ip: str, cvp_username: str, cvp_password: str) -> str:
"""Generate AUTH token from CVP using password."""
# TODO: need to handle requests error
def get_cv_token(cvp_ip: str, cvp_username: str, cvp_password: str, *, verify_cert: bool) -> str:
"""Generate the authentication token from CloudVision using username and password.
TODO: need to handle requests error
Args:
----
cvp_ip: IP address of CloudVision.
cvp_username: Username to connect to CloudVision.
cvp_password: Password to connect to CloudVision.
verify_cert: Enable or disable certificate verification when connecting to CloudVision.
Returns
-------
token(str): The token to use in further API calls to CloudVision.
Raises
------
requests.ssl.SSLError: If the certificate verification fails
"""
# use CVP REST API to generate a token
url = f"https://{cvp_ip}/cvpservice/login/authenticate.do"
payload = json.dumps({"userId": cvp_username, "password": cvp_password})
headers = {"Content-Type": "application/json", "Accept": "application/json"}

response = requests.request("POST", url, headers=headers, data=payload, verify=False, timeout=10)
response = requests.request("POST", url, headers=headers, data=payload, verify=verify_cert, timeout=10)
return response.json()["sessionId"]


def write_inventory_to_file(hosts: list[AntaInventoryHost], output: Path) -> None:
"""Write a file inventory from pydantic models."""
i = AntaInventoryInput(hosts=hosts)
with output.open(mode="w", encoding="UTF-8") as out_fd:
out_fd.write(yaml.dump({AntaInventory.INVENTORY_ROOT_KEY: i.model_dump(exclude_unset=True)}))
out_fd.write(yaml.dump({AntaInventory.INVENTORY_ROOT_KEY: yaml.safe_load(i.yaml())}))
logger.info("ANTA inventory file has been created: '%s'", output)


Expand Down
15 changes: 15 additions & 0 deletions anta/inventory/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from __future__ import annotations

import logging
import math

import yaml
from pydantic import BaseModel, ConfigDict, IPvAnyAddress, IPvAnyNetwork

from anta.custom_types import Hostname, Port
Expand Down Expand Up @@ -82,3 +84,16 @@ class AntaInventoryInput(BaseModel):
networks: list[AntaInventoryNetwork] | None = None
hosts: list[AntaInventoryHost] | None = None
ranges: list[AntaInventoryRange] | None = None

def yaml(self) -> str:
"""Return a YAML representation string of this model.
Returns
-------
The YAML representation string of this model.
"""
# TODO: Pydantic and YAML serialization/deserialization is not supported natively.
# This could be improved.
# https://github.com/pydantic/pydantic/issues/1043
# Explore if this worth using this: https://github.com/NowanIlfideme/pydantic-yaml
return yaml.safe_dump(yaml.safe_load(self.model_dump_json(serialize_as_any=True, exclude_unset=True)), indent=2, width=math.inf)
27 changes: 19 additions & 8 deletions docs/cli/inv-from-cvp.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,32 @@

In large setups, it might be beneficial to construct your inventory based on CloudVision. The `from-cvp` entrypoint of the `get` command enables the user to create an ANTA inventory from CloudVision.

!!! info
The current implementation only works with on-premises CloudVision instances, not with CloudVision as a Service (CVaaS).

### Command overview

```bash
anta get from-cvp --help
Usage: anta get from-cvp [OPTIONS]

Build ANTA inventory from Cloudvision
Build ANTA inventory from CloudVision.

NOTE: Only username/password authentication is supported for on-premises CloudVision instances.
Token authentication for both on-premises and CloudVision as a Service (CVaaS) is not supported.

Options:
-ip, --cvp-ip TEXT CVP IP Address [required]
-u, --cvp-username TEXT CVP Username [required]
-p, --cvp-password TEXT CVP Password / token [required]
-c, --cvp-container TEXT Container where devices are configured
-d, --inventory-directory PATH Path to save inventory file
--help Show this message and exit.
-o, --output FILE Path to save inventory file [env var: ANTA_INVENTORY;
required]
--overwrite Do not prompt when overriding current inventory [env
var: ANTA_GET_FROM_CVP_OVERWRITE]
-host, --host TEXT CloudVision instance FQDN or IP [required]
-u, --username TEXT CloudVision username [required]
-p, --password TEXT CloudVision password [required]
-c, --container TEXT CloudVision container where devices are configured
--ignore-cert By default connection to CV will use HTTPS
certificate, set this flag to disable it [env var:
ANTA_GET_FROM_CVP_IGNORE_CERT]
--help Show this message and exit.
```

The output is an inventory where the name of the container is added as a tag for each host:
Expand Down
48 changes: 34 additions & 14 deletions tests/units/cli/get/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from unittest.mock import ANY, patch

import pytest
import requests
from cvprac.cvp_client_errors import CvpApiError

from anta.cli._main import anta
Expand All @@ -24,19 +25,25 @@


@pytest.mark.parametrize(
("cvp_container", "cvp_connect_failure"),
("cvp_container", "verify_cert", "cv_token_failure", "cvp_connect_failure"),
[
pytest.param(None, False, id="all devices"),
pytest.param("custom_container", False, id="custom container"),
pytest.param(None, True, id="cvp connect failure"),
pytest.param(None, True, False, False, id="all devices - verify cert"),
pytest.param(None, True, True, False, id="all devices - fail SSL check"),
pytest.param(None, False, False, False, id="all devices - do not verify cert"),
pytest.param("custom_container", False, False, False, id="custom container"),
pytest.param(None, False, False, True, id="cvp connect failure"),
],
)
def test_from_cvp(
tmp_path: Path,
click_runner: CliRunner,
cvp_container: str | None,
verify_cert: bool,
cv_token_failure: bool,
cvp_connect_failure: bool,
) -> None:
# pylint: disable=too-many-arguments
# ruff: noqa: C901
"""Test `anta get from-cvp`.
This test verifies that username and password are NOT mandatory to run this command
Expand All @@ -57,14 +64,20 @@ def test_from_cvp(

if cvp_container is not None:
cli_args.extend(["--container", cvp_container])
if not verify_cert:
cli_args.extend(["--ignore-cert"])

def mock_get_cv_token(*_args: str, **_kwargs: str) -> None:
if cv_token_failure:
raise requests.exceptions.SSLError

def mock_cvp_connect(_self: CvpClient, *_args: str, **_kwargs: str) -> None:
if cvp_connect_failure:
raise CvpApiError(msg="mocked CvpApiError")

# always get a token
with (
patch("anta.cli.get.commands.get_cv_token", return_value="dummy_token"),
patch("anta.cli.get.commands.get_cv_token", autospec=True, side_effect=mock_get_cv_token),
patch(
"cvprac.cvp_client.CvpClient.connect",
autospec=True,
Expand All @@ -79,20 +92,27 @@ def mock_cvp_connect(_self: CvpClient, *_args: str, **_kwargs: str) -> None:
):
result = click_runner.invoke(anta, cli_args)

if not cvp_connect_failure:
if not cvp_connect_failure and not cv_token_failure:
assert output.exists()

if cv_token_failure:
assert "Authentication to CloudVison failed" in result.output
assert result.exit_code == ExitCode.USAGE_ERROR
return

mocked_cvp_connect.assert_called_once()
if not cvp_connect_failure:
assert "Connected to CloudVision" in result.output
if cvp_container is not None:
mocked_get_devices_in_container.assert_called_once_with(ANY, cvp_container)
else:
mocked_get_inventory.assert_called_once()
assert result.exit_code == ExitCode.OK
else:

if cvp_connect_failure:
assert "Error connecting to CloudVision" in result.output
assert result.exit_code == ExitCode.USAGE_ERROR
return

assert "Connected to CloudVision" in result.output
if cvp_container is not None:
mocked_get_devices_in_container.assert_called_once_with(ANY, cvp_container)
else:
mocked_get_inventory.assert_called_once()
assert result.exit_code == ExitCode.OK


@pytest.mark.parametrize(
Expand Down
13 changes: 10 additions & 3 deletions tests/units/cli/get/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@
DATA_DIR: Path = Path(__file__).parents[3].resolve() / "data"


def test_get_cv_token() -> None:
@pytest.mark.parametrize(
"verify_cert",
[
pytest.param(True, id="Verify cert enabled"),
pytest.param(False, id="Verify cert disabled"),
],
)
def test_get_cv_token(verify_cert: bool) -> None:
"""Test anta.get.utils.get_cv_token."""
ip_addr = "42.42.42.42"
username = "ant"
Expand All @@ -29,13 +36,13 @@ def test_get_cv_token() -> None:
mocked_ret = MagicMock(autospec=requests.Response)
mocked_ret.json.return_value = {"sessionId": "simple"}
patched_request.return_value = mocked_ret
res = get_cv_token(ip_addr, username, password)
res = get_cv_token(ip_addr, username, password, verify_cert=verify_cert)
patched_request.assert_called_once_with(
"POST",
"https://42.42.42.42/cvpservice/login/authenticate.do",
headers={"Content-Type": "application/json", "Accept": "application/json"},
data='{"userId": "ant", "password": "formica"}',
verify=False,
verify=verify_cert,
timeout=10,
)
assert res == "simple"
Expand Down

0 comments on commit e5222d1

Please sign in to comment.