Skip to content

Commit

Permalink
Plugins: Yarascan + Vadyarascan Context
Browse files Browse the repository at this point in the history
This attempts to address limitations with the current implementation
of the `YaraScan` and `VadYaraScan` plugins that seriously impacts their
usefulness in the CLI; namely, the inability to view user-defined context
surrounding yara matches in a hexdump format.

In the CLI, users must now enumerate yara hits with one of one the plugins,
then copy information about the hit, such as the PID and offset, to
another location, and re-read the data from the layer in which the match
occurred within volshell, which is a laborious process.

Within volshell, there is no publicly available API on either the
`YaraScan` or `VadYaraScan` classes to enumerate hits and interact with
those values programatically outside of constructing an instance of the
plugin and retrieving values from the `TreeGrid` returned by the `run`
method. In addition to the changes proposed here, we may want to
consider providing classmethods for performing at least yara string
searches without requiring users to manually update the configuration
and construct the plugins via their constructors.
  • Loading branch information
dgmcdona committed Oct 2, 2024
1 parent 5d2a5f9 commit fbcda54
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 23 deletions.
60 changes: 46 additions & 14 deletions volatility3/framework/plugins/windows/vadyarascan.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

import logging
from typing import Iterable, List, Tuple
from typing import Iterable, Iterator, List, NamedTuple, Tuple

from volatility3.framework import interfaces, renderers
from volatility3.framework.configuration import requirements
Expand All @@ -14,6 +14,14 @@
vollog = logging.getLogger(__name__)


class YaraMatch(NamedTuple):
offset: int
pid: int
rule: str
match_string_identifier: str
matched_data: bytes


class VadYaraScan(interfaces.plugins.PluginInterface):
"""Scans all the Virtual Address Descriptor memory maps using yara."""

Expand All @@ -33,7 +41,7 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]
name="pslist", plugin=pslist.PsList, version=(2, 0, 0)
),
requirements.PluginRequirement(
name="yarascan", plugin=yarascan.YaraScan, version=(2, 0, 0)
name="yarascan", plugin=yarascan.YaraScan, version=(3, 0, 0)
),
requirements.ListRequirement(
name="pid",
Expand All @@ -49,7 +57,7 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]
# return the combined requirements
return yarascan_requirements + vadyarascan_requirements

def _generator(self):
def enumerate_matches(self) -> Iterator[YaraMatch]:
kernel = self.context.modules[self.config["kernel"]]

rules = yarascan.YaraScan.process_yara_options(dict(self.config))
Expand Down Expand Up @@ -79,37 +87,61 @@ def _generator(self):
if yarascan.YaraScan.yara_returns_instances():
for match_string in match.strings:
for instance in match_string.instances:
yield 0, (
format_hints.Hex(instance.offset + start),
yield YaraMatch(
instance.offset + start,
task.UniqueProcessId,
match.rule,
match_string.identifier,
instance.matched_data,
data[
max(
instance.offset
- self.config["context_before"],
0,
) : instance.offset
+ self.config["context_after"]
],
)
else:
for offset, name, value in match.strings:
yield 0, (
format_hints.Hex(offset + start),
yield YaraMatch(
offset + start,
task.UniqueProcessId,
match.rule,
name,
value,
data[
max(
offset - self.config["context_before"], 0
) : offset
+ self.config["context_after"]
],
)
else:
for match in rules.scan(data).matching_rules:
for match_string in match.patterns:
for instance in match_string.matches:
yield 0, (
format_hints.Hex(instance.offset + start),
yield YaraMatch(
instance.offset + start,
task.UniqueProcessId,
f"{match.namespace}.{match.identifier}",
match_string.identifier,
data[
instance.offset : instance.offset
+ instance.length
max(
instance.offset
- self.config["context_before"],
0,
) : instance.offset
+ self.config["context_after"]
],
)

def _generator(self):
for match in self.enumerate_matches():
yield 0, (
format_hints.Hex(match[0]),
*(match[1:-1]),
format_hints.HexBytes(match[-1]),
)

@staticmethod
def get_vad_maps(
task: interfaces.objects.ObjectInterface,
Expand All @@ -134,7 +166,7 @@ def run(self):
("PID", int),
("Rule", str),
("Component", str),
("Value", bytes),
("Value", format_hints.HexBytes),
],
self._generator(),
)
65 changes: 56 additions & 9 deletions volatility3/framework/plugins/yarascan.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@


class YaraScanner(interfaces.layers.ScannerInterface):
_version = (2, 1, 0)
_version = (3, 0, 0)

# yara.Rules isn't exposed, so we can't type this properly
def __init__(self, rules) -> None:
def __init__(self, rules, context_before=0, context_after=32) -> None:
super().__init__()
if rules is None:
raise ValueError("No rules provided to YaraScanner")
Expand All @@ -50,6 +50,8 @@ def __init__(self, rules) -> None:
if USE_YARA_X
else not tuple(int(x) for x in yara.__version__.split(".")) < (4, 3)
)
self._context_before = context_before
self._context_after = context_after

def __call__(
self, data: bytes, data_offset: int
Expand All @@ -62,7 +64,12 @@ def __call__(
instance.offset + data_offset,
f"{match.namespace}.{match.identifier}",
match_string.identifier,
data[instance.offset : instance.offset + instance.length],
data[
max(instance.offset - self._context_before, 0) : max(
instance.offset + instance.length,
instance.offset + self._context_after,
)
],
)
else:
for match in self._rules.match(data=data):
Expand All @@ -73,11 +80,27 @@ def __call__(
instance.offset + data_offset,
match.rule,
match_string.identifier,
instance.matched_data,
data[
max(
instance.offset - self._context_before, 0
) : max(
instance.offset + len(instance.matched_data),
instance.offset + self._context_after,
)
],
)
else:
for offset, name, value in match.strings:
yield (offset + data_offset, match.rule, name, value)
yield (
offset + data_offset,
match.rule,
name,
data[
max(offset - self._context_before, 0) : max(
offset + self._context_after, offset + len(value)
)
],
)

@staticmethod
def get_rule(rule):
Expand Down Expand Up @@ -106,9 +129,12 @@ class YaraScan(plugins.PluginInterface):
"""Scans kernel memory using yara rules (string or file)."""

_required_framework_version = (2, 0, 0)
_version = (2, 0, 0)
_version = (3, 0, 0)
_yara_x = USE_YARA_X

CONTEXT_BEFORE_DEFAULT = 0
CONTEXT_AFTER_DEFAULT = 32

@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
"""Returns the requirements needed to run yarascan directly, combining the TranslationLayerRequirement
Expand Down Expand Up @@ -164,6 +190,19 @@ def get_yarascan_option_requirements(
description="Set the maximum size (default is 1GB)",
optional=True,
),
requirements.IntRequirement(
name="context_before",
description="Number of bytes of context to display before start of match",
default=cls.CONTEXT_BEFORE_DEFAULT,
optional=True,
),
requirements.IntRequirement(
name="context_after",
description="Number of bytes of context to display after start of match. "
"If the size of the match exceeds this value, the full match is shown",
default=cls.CONTEXT_AFTER_DEFAULT,
optional=True,
),
]

@classmethod
Expand Down Expand Up @@ -199,17 +238,25 @@ def _generator(self):

layer = self.context.layers[self.config["primary"]]
for offset, rule_name, name, value in layer.scan(
context=self.context, scanner=YaraScanner(rules=rules)
context=self.context,
scanner=YaraScanner(
rules, self.config["context_before"], self.config["context_after"]
),
):
yield 0, (format_hints.Hex(offset), rule_name, name, value)
yield 0, (
format_hints.Hex(offset),
rule_name,
name,
format_hints.HexBytes(value),
)

def run(self):
return renderers.TreeGrid(
[
("Offset", format_hints.Hex),
("Rule", str),
("Component", str),
("Value", bytes),
("Value", format_hints.HexBytes),
],
self._generator(),
)

0 comments on commit fbcda54

Please sign in to comment.