diff --git a/DEVELOPING.md b/DEVELOPING.md index 3cbcfc39..4be1e4c5 100644 --- a/DEVELOPING.md +++ b/DEVELOPING.md @@ -300,6 +300,7 @@ Possible vulnerability schemas supported within the vunnel repo are: - [Generic OS Vulnerability](https://github.com/anchore/vunnel/tree/main/schema/vulnerability/os) - [GitHub Security Advisories](https://github.com/anchore/vunnel/tree/main/schema/vulnerability/github-security-advisory) - [NVD Vulnerability](https://github.com/anchore/vunnel/tree/main/schema/vulnerability/nvd) +- [Open Source Vulnerability (OSV)](https://ossf.github.io/osv-schema) If at any point a breaking change needs to be made to a provider (and say the schema remains the same), then you can set the `__version__` attribute on the provider class to a new integer value (incrementing from `1` onwards). This @@ -373,6 +374,7 @@ All results must conform to a [particular schema](https://github.com/anchore/vun - `os`: a generic operating system vulnerability (e.g redhat, debian, ubuntu, alpine, wolfi, etc.) - `nvd`: tailored to describe vulnerabilities from the NVD - `github-security-advisory`: tailored to describe vulnerabilities from GitHub +- `osv`: tailored to describe vulnerabilities from the [aggregated OSV vulnerability database](https://osv.dev/list) Once the provider is implemented, you will need to wire it up into the application in a couple places: - add a new entry under the dispatch table in `src/vunnel/providers/__init__.py` mapping your provider name to the class diff --git a/src/vunnel/cli/config.py b/src/vunnel/cli/config.py index d36b919f..730fb49b 100644 --- a/src/vunnel/cli/config.py +++ b/src/vunnel/cli/config.py @@ -44,6 +44,7 @@ class CommonProviderConfig: class Providers: alpine: providers.alpine.Config = field(default_factory=providers.alpine.Config) amazon: providers.amazon.Config = field(default_factory=providers.amazon.Config) + bitnami: providers.bitnami.Config = field(default_factory=providers.bitnami.Config) chainguard: providers.chainguard.Config = field(default_factory=providers.chainguard.Config) debian: providers.debian.Config = field(default_factory=providers.debian.Config) github: providers.github.Config = field(default_factory=providers.github.Config) diff --git a/src/vunnel/providers/__init__.py b/src/vunnel/providers/__init__.py index 73498e01..210319aa 100644 --- a/src/vunnel/providers/__init__.py +++ b/src/vunnel/providers/__init__.py @@ -7,6 +7,7 @@ from vunnel.providers import ( alpine, amazon, + bitnami, chainguard, debian, github, @@ -30,6 +31,7 @@ _providers: dict[str, type[provider.Provider]] = { alpine.Provider.name(): alpine.Provider, amazon.Provider.name(): amazon.Provider, + bitnami.Provider.name(): bitnami.Provider, debian.Provider.name(): debian.Provider, github.Provider.name(): github.Provider, mariner.Provider.name(): mariner.Provider, diff --git a/src/vunnel/providers/bitnami/__init__.py b/src/vunnel/providers/bitnami/__init__.py new file mode 100644 index 00000000..72004a76 --- /dev/null +++ b/src/vunnel/providers/bitnami/__init__.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from vunnel import provider, result, schema + +from .parser import Parser + +if TYPE_CHECKING: + import datetime + + +@dataclass +class Config: + runtime: provider.RuntimeConfig = field( + default_factory=lambda: provider.RuntimeConfig( + result_store=result.StoreStrategy.SQLITE, + existing_results=provider.ResultStatePolicy.DELETE_BEFORE_WRITE, + ), + ) + request_timeout: int = 125 + + +class Provider(provider.Provider): + + __schema__ = schema.OSVSchema() + __distribution_version__ = int(__schema__.major_version) + + def __init__(self, root: str, config: Config | None = None): + if not config: + config = Config() + + super().__init__(root, runtime_cfg=config.runtime) + self.config = config + self.logger.debug(f"config: {config}") + + self.schema = self.__schema__ + self.parser = Parser( + ws=self.workspace, + logger=self.logger, + ) + + # this provider requires the previous state from former runs + provider.disallow_existing_input_policy(config.runtime) + + @classmethod + def name(cls) -> str: + return "bitnami" + + def update(self, last_updated: datetime.datetime | None) -> tuple[list[str], int]: + + # TODO: use of last_updated as NVD provider does to avoid downloading all + # vulnerability data from the source and make incremental updates instead + with self.results_writer() as writer: + for vuln_id, record in self.parser.get(): + writer.write( + identifier=vuln_id.lower(), + schema=self.schema, + payload=record, + ) + + return self.parser.urls, len(writer) diff --git a/src/vunnel/providers/bitnami/git.py b/src/vunnel/providers/bitnami/git.py new file mode 100644 index 00000000..1290b952 --- /dev/null +++ b/src/vunnel/providers/bitnami/git.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import logging +import os +import shlex +import shutil +import subprocess +import tempfile +from dataclasses import dataclass + +from vunnel import utils + + +@dataclass +class GitRevision: + sha: str + file: str + + +class GitWrapper: + _check_cmd_ = "git --version" + _is_git_repo_cmd_ = "git rev-parse --is-inside-work-tree" + _clone_cmd_ = "git clone -b {branch} {src} {dest}" + _check_out_cmd_ = "git checkout {branch}" + + def __init__( + self, + source: str, + branch: str, + checkout_dest: str, + logger: logging.Logger | None = None, + ): + self.src = source + self.branch = branch + self.dest = checkout_dest + self.workspace = tempfile.gettempdir() + + if not logger: + logger = logging.getLogger(self.__class__.__name__) + self.logger = logger + + try: + out = self._exec_cmd(self._check_cmd_) + self.logger.trace(f"git executable verified using cmd: {self._check_cmd_}, output: {out.decode()}") + except: + self.logger.exception('could not find required "git" executable. Please install git on host') + raise + + def _check(self, destination): + try: + if not os.path.exists(destination): + self.logger.debug(f"git working tree not found at {destination}") + return False + + cmd = self._is_git_repo_cmd_ + out = self._exec_cmd(cmd, cwd=destination) + self.logger.debug(f"check for git repository, cmd: {cmd}, output: {out.decode()}") + except Exception: + self.logger.debug(f"git working tree not found at {destination}", exc_info=True) + return False + + return True + + def delete_repo(self): + if os.path.exists(self.dest): + self.logger.debug("deleting existing repository") + shutil.rmtree(self.dest, ignore_errors=True) + + @utils.retry_with_backoff() + def clone_repo(self): + try: + self.logger.info(f"cloning git repository {self.src} branch {self.branch} to {self.dest}") + cmd = self._clone_cmd_.format(src=self.src, dest=self.dest, branch=self.branch) + out = self._exec_cmd(cmd) + self.logger.debug(f"initialized git repo, cmd: {cmd}, output: {out.decode()}") + except: + self.logger.exception(f"failed to clone git repository {self.src} branch {self.branch} to {self.dest}") + raise + + def _exec_cmd(self, cmd, *args, **kwargs) -> bytes: + """ + Run a command with errors etc handled + :param cmd: list of arguments (including command name, e.g. ['ls', '-l]) + :param args: + :param kwargs: + :return: + """ + try: + self.logger.trace(f"running: {cmd}") + cmd_list = shlex.split(cmd) + # S603 disable explanation: running git commands by design + return subprocess.check_output(cmd_list, *args, **kwargs, stderr=subprocess.PIPE) # noqa: S603 + except Exception as e: + self.logger.exception(f"error executing command: {cmd}") + raise e diff --git a/src/vunnel/providers/bitnami/parser.py b/src/vunnel/providers/bitnami/parser.py new file mode 100644 index 00000000..206da1d7 --- /dev/null +++ b/src/vunnel/providers/bitnami/parser.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import logging +import os +from typing import TYPE_CHECKING + +import orjson + +if TYPE_CHECKING: + from vunnel.workspace import Workspace + +from .git import GitWrapper + +namespace = "bitnami" + + +class Parser: + _git_src_url_ = "https://github.com/bitnami/vulndb.git" + _git_src_branch_ = "main" + + def __init__(self, ws: Workspace, logger: logging.Logger | None = None): + self.workspace = ws + self.git_url = self._git_src_url_ + self.git_branch = self._git_src_branch_ + self.urls = [self.git_url] + if not logger: + logger = logging.getLogger(self.__class__.__name__) + self.logger = logger + _checkout_dst_ = os.path.join(self.workspace.input_path, "vulndb") + self.git_wrapper = GitWrapper( + source=self.git_url, + branch=self.git_branch, + checkout_dest=_checkout_dst_, + logger=self.logger, + ) + + def _load(self): + self.logger.info("loading data from git repository") + + vuln_data_dir = os.path.join(self.workspace.input_path, "vulndb", "data") + for root, dirs, files in os.walk(vuln_data_dir): + dirs.sort() + for file in sorted(files): + full_path = os.path.join(root, file) + with open(full_path, encoding="utf-8") as f: + yield orjson.loads(f.read()) + + def _normalize(self, vuln_entry): + self.logger.info("normalizing vulnerability data") + + vuln_id = vuln_entry["id"] + if "aliases" in vuln_entry and len(vuln_entry["aliases"]) > 0: + vuln_id = vuln_entry["aliases"][0] + fixed_in = [] + if "affected" in vuln_entry: + for affected in vuln_entry["affected"]: + version = "None" + if "ranges" in affected: + for r in affected["ranges"]: + if "events" in r: + for event in r["events"]: + # TODO: manage last_affected + # if events["last_affected"]: + # version = events["last_affected"] + # break + if "fixed" in event: + version = event["fixed"] + break + + fixed_in.append( + { + "Name": affected["package"]["name"], + "VersionFormat": "semver", + "NamespaceName": namespace, + "Version": version, + }, + ) + link = "None" + if "references" in vuln_entry and len(vuln_entry["references"]) > 0: + link = vuln_entry["references"][0] + + return vuln_id, { + "Vulnerability": { + "Name": vuln_id, + "NamespaceName": namespace, + "Link": link, + "Severity": vuln_entry["database_specific"]["severity"], + "Description": vuln_entry["details"], + "FixedIn": fixed_in, + }, + } + + def get(self): + # Initialize the git repository + self.git_wrapper.delete_repo() + self.git_wrapper.clone_repo() + + # Load the data from the git repository + for vuln_entry in self._load(): + # Normalize the loaded data + yield self._normalize(vuln_entry)