Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deficient intel filtering #57

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 66 additions & 5 deletions src/cve/data_models/cve_intel.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,32 @@
# limitations under the License.


import abc
import typing
from typing_extensions import Self

from pydantic import BaseModel
from pydantic import computed_field
from pydantic import ConfigDict
from pydantic import Field


class CveIntelGhsa(BaseModel):
class IntelSource(BaseModel, abc.ABC):

@property
@abc.abstractmethod
def intel_sufficient(self) -> bool:
"""
Logic to determine if the CVE has sufficient intel for this particular intel source.

Returns
-------
bool
True if enough intel from this source has been found
"""


class CveIntelGhsa(IntelSource):
"""
Information about a GHSA (GitHub Security Advisory) entry.
"""
Expand All @@ -46,8 +64,13 @@ class CWE(BaseModel):
published_at: str | None = None
updated_at: str | None = None

@property
def intel_sufficient(self) -> bool:
has_vuln_info = False if self.vulnerabilities is None or len(self.vulnerabilities) == 0 else True
return self.description is not None or self.summary is not None or has_vuln_info


class CveIntelNvd(BaseModel):
class CveIntelNvd(IntelSource):
"""
Information about an NVD (National Vulnerability Database) entry.
"""
Expand Down Expand Up @@ -77,8 +100,13 @@ class Configuration(BaseModel):
published_at: str | None = None
updated_at: str | None = None

@property
def intel_sufficient(self) -> bool:
has_vuln_info = False if self.configurations is None or len(self.configurations) == 0 else True
return not (self.cve_description is None and self.cwe_description is None and not has_vuln_info)


class CveIntelRhsa(BaseModel):
class CveIntelRhsa(IntelSource):
"""
Information about a RHSA (Red Hat Security Advisory) entry.
"""
Expand Down Expand Up @@ -107,8 +135,13 @@ class CVSS3(BaseModel):
upstream_fix: str | None = None
cvss3: CVSS3 | None = None

@property
def intel_sufficient(self):
has_vuln_info = False if self.package_state is None or len(self.package_state) == 0 else True
return not (self.bugzilla.description is None and not has_vuln_info)


class CveIntelUbuntu(BaseModel):
class CveIntelUbuntu(IntelSource):
"""
Information about a Ubuntu CVE entry.
"""
Expand Down Expand Up @@ -142,12 +175,18 @@ class Impact(BaseModel):

description: str | None = None
notes: list[Note] | None = None
notices: list | None = None
priority: str | None = None
ubuntu_description: str | None = None
impact: Impact | None = None

@property
def intel_sufficient(self):
has_vuln_info = False if self.notices is None or len(self.notices) == 0 else True
return not (self.description is None and self.ubuntu_description is None and not has_vuln_info)


class CveIntelEpss(BaseModel):
class CveIntelEpss(IntelSource):
"""
Information about an EPSS (Elastic Product Security Service) entry.
"""
Expand All @@ -157,6 +196,10 @@ class CveIntelEpss(BaseModel):
percentile: float | None = None
date: str | None = None

@property
def intel_sufficient(self):
return False


class CveIntel(BaseModel):
"""
Expand All @@ -174,6 +217,24 @@ class CveIntel(BaseModel):
ubuntu: CveIntelUbuntu | None = None
epss: CveIntelEpss | None = None

@computed_field()
shawn-davis marked this conversation as resolved.
Show resolved Hide resolved
@property
def has_sufficient_intel(self) -> bool:
"""
Logic to determine if the CVE has sufficient intel and can be passed to the agent.

Returns
-------
bool
True if enough intel has been found for the CVE
"""
sufficiency = False
for field_name, field in self.model_fields.items():
if isinstance(getattr(self, field_name), IntelSource):
if not getattr(self, field_name) is None:
sufficiency = getattr(self, field_name).intel_sufficient or sufficiency
return sufficiency
Comment on lines +223 to +236
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that adding an intel_sufficient property to each intel class, each with different definitions adds a lot of complexity, maybe more than we need to address the invalid CVE case.

What do you think of the simpler approach to just check if each intel object is None? Unfortunately this will require some changes to the intel retrieval to write a None value when the CVE doesn't exist. It seems like we do this for GHSA and NVD, but not the other sources.

        "intel": [
            {
                "vuln_id": "CVE-0000-00000",
                "ghsa": null,
                "nvd": null,
                "rhsa": {
                    "bugzilla": {
                        "description": null,
                        "id": null,
                        "url": null
                    },
                    "details": null,
                    "statement": null,
                    "package_state": null,
                    "upstream_fix": null,
                    "cvss3": null
                },
                "ubuntu": {
                    "description": null,
                    "notes": null,
                    "notices": null,
                    "priority": null,
                    "ubuntu_description": null,
                    "impact": null
                },
                "epss": {
                    "epss": null,
                    "percentile": null,
                    "date": null
                },
                "sufficient_intel": false
            }
        ],

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it was important to have direct control over the exact definition of sufficient detail. If the only source was EPSS (granted, an extremely unlikely scenario) that would not be sufficient for the agent to make an informed call on the validity. It also allows for explicit adjustment if the intel sources change.


@property
def cve_id(self):
"""
Expand Down
2 changes: 2 additions & 0 deletions src/cve/pipeline/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ def run_retrieval_qa_tool(retrieval_qa_tool: RetrievalQA, query: str) -> str | d

sys_prompt = run_config.engine.agent.model.prompt or DEFAULT_SYS_PROMPT

sys_prompt = run_config.engine.agent.model.prompt or DEFAULT_SYS_PROMPT

# Initialize an agent with the tools and settings defined above.
# This agent is designed to handle zero-shot reaction descriptions and parse errors.
agent = initialize_agent(
Expand Down
6 changes: 4 additions & 2 deletions src/cve/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,14 @@ def convert_input_to_df(message: AgentMorpheusEngineInput) -> ControlMessage:
len(v.vuln_package_intel_sources) > 0 for v in message.info.vulnerable_dependencies
]

has_sufficient_intel_flags = full_df["has_sufficient_intel"]

full_df["vulnerable_dependencies"] = vulnerable_dependencies

# Filter full_df by whether the CVE has vulnerable dependencies or lacks vulnerable package info from intel
filtered_df = full_df[[
len(vuln_deps) > 0 or not has_vuln_package_info for vuln_deps,
has_vuln_package_info in zip(vulnerable_dependencies, has_vuln_package_info_flags)
len(vuln_deps) > 0 or (not has_vuln_package_info and has_sufficient_intel) for vuln_deps,
has_vuln_package_info, has_sufficient_intel in zip(vulnerable_dependencies, has_vuln_package_info_flags, sufficient_intel_flag)
]]

# Convert pandas to cudf
Expand Down
34 changes: 28 additions & 6 deletions src/cve/stages/convert_to_output_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def _parse_agent_morpheus_engine_output(row: dict) -> AgentMorpheusEngineOutput:
justification=justification_output)


def _get_placeholder_output(vuln_id: str) -> AgentMorpheusEngineOutput:
def _get_no_vuln_packages_output(vuln_id: str) -> AgentMorpheusEngineOutput:
SUMMARY = "The VulnerableDependencyChecker did not find any vulnerable packages or dependencies in the SBOM."
JUSTIFICATION = JustificationOutput(label="code_not_present",
reason="No vulnerable packages or dependencies were detected in the SBOM.",
Expand All @@ -97,6 +97,24 @@ def _get_placeholder_output(vuln_id: str) -> AgentMorpheusEngineOutput:
justification=JUSTIFICATION)


def _get_deficient_intel_output(vuln_id: str) -> AgentMorpheusEngineOutput:
SUMMARY = "There is insufficient intel available to determine vulnerability. This is either due to the CVE not existing or there is not enough gathered intel for the agent to make an informed decision."
JUSTIFICATION = JustificationOutput(label="insufficient_intel",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use uncertain as the justification label? Using a standardized value in the CVEJustifyNode would prevent any downstream impacts of unexpected labels.

Suggested change
JUSTIFICATION = JustificationOutput(label="insufficient_intel",
JUSTIFICATION = JustificationOutput(label="uncertain",

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could label it whatever we want. I thought it was helpful to distinguish between the LLM couldn't come up with a definitive label vs. the LLM didn't even look at this CVE (likely because it doesn't actually exist).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I do see the value of the label as well. I thought about backing into this info from the summary or the intel object, but this will take more work later on.

My main concern is about breaking any output contracts. Let's double check if the team has any concerns with adding a new justification label. If not, I'm good with it. We did also make another breaking change of adding a newline to the output, so we'll likely need to bump our version anyway.

reason="Insufficient intel available for CVE",
status="UNKNOWN")
return AgentMorpheusEngineOutput(
vuln_id=vuln_id,
checklist=[
ChecklistItemOutput(
input="Gather intel for the CVE.",
response=
"There is insufficient intel available to determine vulnerability. This is either due to the CVE not existing or there is not enough gathered intel for the agent to make an informed decision.",
intermediate_steps=None)
],
Comment on lines +107 to +113
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently these placeholder checklist items are only conditionally displayed when the agent is skipped, which has caused confusion for some users. We recently got a question about whether the SBOM check only happens when the "Check SBOM and dependencies for vulnerability" checklist item is shown.

Do you recall why we decided to include the placeholder checklist item? I'm wondering if we could safely omit it for consistency? This would also make it clearer that no checklist was generated by the model.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put those in to keep the output consistent. Makes parsing easier and ensures that we don't have a bunch of empty fields.

summary=SUMMARY,
justification=JUSTIFICATION)


@stage
def convert_to_output_object(message: ControlMessage) -> AgentMorpheusOutput:
"""
Expand All @@ -113,10 +131,14 @@ def convert_to_output_object(message: ControlMessage) -> AgentMorpheusOutput:
sbom: AgentMorpheusInfo.SBOMInfo = message.get_metadata("info.sbom")
vulnerable_dependencies: list[VulnerableDependencies] = message.get_metadata("info.vulnerable_dependencies")

filtered_vulns = [
no_vulns = [
vuln_dep.vuln_id for vuln_dep in vulnerable_dependencies if len(vuln_dep.vulnerable_sbom_packages) == 0
]

deficient_intel = [
i.get_cve_id() for i in intel if not i.sufficient_intel
]

# Extract LLMEngine output from message df to dict of {vuln_id: row}
with message.payload().mutable_dataframe() as df:
df2 = df.set_index("vuln_id", drop=False)
Expand All @@ -131,10 +153,10 @@ def convert_to_output_object(message: ControlMessage) -> AgentMorpheusOutput:

if vuln_id in llm_engine_output:
output.append(_parse_agent_morpheus_engine_output(llm_engine_output[vuln_id]))

elif vuln_id in filtered_vulns:
output.append(_get_placeholder_output(vuln_id))

elif vuln_id in deficient_intel:
output.append(_get_deficient_intel_output(vuln_id))
elif vuln_id in no_vulns:
output.append(_get_no_vuln_packages_output(vuln_id))
else:
assert False, "CVE has vulnerable dependencies but there is no LLMEngine output."

Expand Down