diff --git a/src/cve/data_models/cve_intel.py b/src/cve/data_models/cve_intel.py index 50c0aa5..6a06c3b 100644 --- a/src/cve/data_models/cve_intel.py +++ b/src/cve/data_models/cve_intel.py @@ -14,14 +14,32 @@ # limitations under the License. +import abc import typing +from typing_extensions import Self from pydantic import BaseModel +from pydantic import computed_field from pydantic import ConfigDict from pydantic import Field -class CveIntelGhsa(BaseModel): +class IntelSource(BaseModel, abc.ABC): + + @property + @abc.abstractmethod + def intel_sufficient(self) -> bool: + """ + Logic to determine if the CVE has sufficient intel for this particular intel source. + + Returns + ------- + bool + True if enough intel from this source has been found + """ + + +class CveIntelGhsa(IntelSource): """ Information about a GHSA (GitHub Security Advisory) entry. """ @@ -46,8 +64,13 @@ class CWE(BaseModel): published_at: str | None = None updated_at: str | None = None + @property + def intel_sufficient(self) -> bool: + has_vuln_info = False if self.vulnerabilities is None or len(self.vulnerabilities) == 0 else True + return self.description is not None or self.summary is not None or has_vuln_info + -class CveIntelNvd(BaseModel): +class CveIntelNvd(IntelSource): """ Information about an NVD (National Vulnerability Database) entry. """ @@ -77,8 +100,13 @@ class Configuration(BaseModel): published_at: str | None = None updated_at: str | None = None + @property + def intel_sufficient(self) -> bool: + has_vuln_info = False if self.configurations is None or len(self.configurations) == 0 else True + return not (self.cve_description is None and self.cwe_description is None and not has_vuln_info) + -class CveIntelRhsa(BaseModel): +class CveIntelRhsa(IntelSource): """ Information about a RHSA (Red Hat Security Advisory) entry. """ @@ -107,8 +135,13 @@ class CVSS3(BaseModel): upstream_fix: str | None = None cvss3: CVSS3 | None = None + @property + def intel_sufficient(self): + has_vuln_info = False if self.package_state is None or len(self.package_state) == 0 else True + return not (self.bugzilla.description is None and not has_vuln_info) + -class CveIntelUbuntu(BaseModel): +class CveIntelUbuntu(IntelSource): """ Information about a Ubuntu CVE entry. """ @@ -142,12 +175,18 @@ class Impact(BaseModel): description: str | None = None notes: list[Note] | None = None + notices: list | None = None priority: str | None = None ubuntu_description: str | None = None impact: Impact | None = None + @property + def intel_sufficient(self): + has_vuln_info = False if self.notices is None or len(self.notices) == 0 else True + return not (self.description is None and self.ubuntu_description is None and not has_vuln_info) + -class CveIntelEpss(BaseModel): +class CveIntelEpss(IntelSource): """ Information about an EPSS (Elastic Product Security Service) entry. """ @@ -157,6 +196,10 @@ class CveIntelEpss(BaseModel): percentile: float | None = None date: str | None = None + @property + def intel_sufficient(self): + return False + class CveIntel(BaseModel): """ @@ -174,6 +217,24 @@ class CveIntel(BaseModel): ubuntu: CveIntelUbuntu | None = None epss: CveIntelEpss | None = None + @computed_field() + @property + def has_sufficient_intel(self) -> bool: + """ + Logic to determine if the CVE has sufficient intel and can be passed to the agent. + + Returns + ------- + bool + True if enough intel has been found for the CVE + """ + sufficiency = False + for field_name, field in self.model_fields.items(): + if isinstance(getattr(self, field_name), IntelSource): + if not getattr(self, field_name) is None: + sufficiency = getattr(self, field_name).intel_sufficient or sufficiency + return sufficiency + @property def cve_id(self): """ diff --git a/src/cve/pipeline/engine.py b/src/cve/pipeline/engine.py index b607600..f0b3f1e 100644 --- a/src/cve/pipeline/engine.py +++ b/src/cve/pipeline/engine.py @@ -153,6 +153,8 @@ def run_retrieval_qa_tool(retrieval_qa_tool: RetrievalQA, query: str) -> str | d sys_prompt = run_config.engine.agent.model.prompt or DEFAULT_SYS_PROMPT + sys_prompt = run_config.engine.agent.model.prompt or DEFAULT_SYS_PROMPT + # Initialize an agent with the tools and settings defined above. # This agent is designed to handle zero-shot reaction descriptions and parse errors. agent = initialize_agent( diff --git a/src/cve/pipeline/pipeline.py b/src/cve/pipeline/pipeline.py index e5cf872..07dee3e 100644 --- a/src/cve/pipeline/pipeline.py +++ b/src/cve/pipeline/pipeline.py @@ -119,12 +119,14 @@ def convert_input_to_df(message: AgentMorpheusEngineInput) -> ControlMessage: len(v.vuln_package_intel_sources) > 0 for v in message.info.vulnerable_dependencies ] + has_sufficient_intel_flags = full_df["has_sufficient_intel"] + full_df["vulnerable_dependencies"] = vulnerable_dependencies # Filter full_df by whether the CVE has vulnerable dependencies or lacks vulnerable package info from intel filtered_df = full_df[[ - len(vuln_deps) > 0 or not has_vuln_package_info for vuln_deps, - has_vuln_package_info in zip(vulnerable_dependencies, has_vuln_package_info_flags) + len(vuln_deps) > 0 or (not has_vuln_package_info and has_sufficient_intel) for vuln_deps, + has_vuln_package_info, has_sufficient_intel in zip(vulnerable_dependencies, has_vuln_package_info_flags, sufficient_intel_flag) ]] # Convert pandas to cudf diff --git a/src/cve/stages/convert_to_output_object.py b/src/cve/stages/convert_to_output_object.py index 149bb65..d3d3254 100644 --- a/src/cve/stages/convert_to_output_object.py +++ b/src/cve/stages/convert_to_output_object.py @@ -79,7 +79,7 @@ def _parse_agent_morpheus_engine_output(row: dict) -> AgentMorpheusEngineOutput: justification=justification_output) -def _get_placeholder_output(vuln_id: str) -> AgentMorpheusEngineOutput: +def _get_no_vuln_packages_output(vuln_id: str) -> AgentMorpheusEngineOutput: SUMMARY = "The VulnerableDependencyChecker did not find any vulnerable packages or dependencies in the SBOM." JUSTIFICATION = JustificationOutput(label="code_not_present", reason="No vulnerable packages or dependencies were detected in the SBOM.", @@ -97,6 +97,24 @@ def _get_placeholder_output(vuln_id: str) -> AgentMorpheusEngineOutput: justification=JUSTIFICATION) +def _get_deficient_intel_output(vuln_id: str) -> AgentMorpheusEngineOutput: + SUMMARY = "There is insufficient intel available to determine vulnerability. This is either due to the CVE not existing or there is not enough gathered intel for the agent to make an informed decision." + JUSTIFICATION = JustificationOutput(label="insufficient_intel", + reason="Insufficient intel available for CVE", + status="UNKNOWN") + return AgentMorpheusEngineOutput( + vuln_id=vuln_id, + checklist=[ + ChecklistItemOutput( + input="Gather intel for the CVE.", + response= + "There is insufficient intel available to determine vulnerability. This is either due to the CVE not existing or there is not enough gathered intel for the agent to make an informed decision.", + intermediate_steps=None) + ], + summary=SUMMARY, + justification=JUSTIFICATION) + + @stage def convert_to_output_object(message: ControlMessage) -> AgentMorpheusOutput: """ @@ -113,10 +131,14 @@ def convert_to_output_object(message: ControlMessage) -> AgentMorpheusOutput: sbom: AgentMorpheusInfo.SBOMInfo = message.get_metadata("info.sbom") vulnerable_dependencies: list[VulnerableDependencies] = message.get_metadata("info.vulnerable_dependencies") - filtered_vulns = [ + no_vulns = [ vuln_dep.vuln_id for vuln_dep in vulnerable_dependencies if len(vuln_dep.vulnerable_sbom_packages) == 0 ] + deficient_intel = [ + i.get_cve_id() for i in intel if not i.sufficient_intel + ] + # Extract LLMEngine output from message df to dict of {vuln_id: row} with message.payload().mutable_dataframe() as df: df2 = df.set_index("vuln_id", drop=False) @@ -131,10 +153,10 @@ def convert_to_output_object(message: ControlMessage) -> AgentMorpheusOutput: if vuln_id in llm_engine_output: output.append(_parse_agent_morpheus_engine_output(llm_engine_output[vuln_id])) - - elif vuln_id in filtered_vulns: - output.append(_get_placeholder_output(vuln_id)) - + elif vuln_id in deficient_intel: + output.append(_get_deficient_intel_output(vuln_id)) + elif vuln_id in no_vulns: + output.append(_get_no_vuln_packages_output(vuln_id)) else: assert False, "CVE has vulnerable dependencies but there is no LLMEngine output."