From fbcda5473ec93a3f3f8784ff1af76d3f7f559af7 Mon Sep 17 00:00:00 2001 From: David McDonald Date: Tue, 1 Oct 2024 19:02:26 -0500 Subject: [PATCH] Plugins: Yarascan + Vadyarascan Context This attempts to address limitations with the current implementation of the `YaraScan` and `VadYaraScan` plugins that seriously impacts their usefulness in the CLI; namely, the inability to view user-defined context surrounding yara matches in a hexdump format. In the CLI, users must now enumerate yara hits with one of one the plugins, then copy information about the hit, such as the PID and offset, to another location, and re-read the data from the layer in which the match occurred within volshell, which is a laborious process. Within volshell, there is no publicly available API on either the `YaraScan` or `VadYaraScan` classes to enumerate hits and interact with those values programatically outside of constructing an instance of the plugin and retrieving values from the `TreeGrid` returned by the `run` method. In addition to the changes proposed here, we may want to consider providing classmethods for performing at least yara string searches without requiring users to manually update the configuration and construct the plugins via their constructors. --- .../framework/plugins/windows/vadyarascan.py | 60 +++++++++++++---- volatility3/framework/plugins/yarascan.py | 65 ++++++++++++++++--- 2 files changed, 102 insertions(+), 23 deletions(-) diff --git a/volatility3/framework/plugins/windows/vadyarascan.py b/volatility3/framework/plugins/windows/vadyarascan.py index efcc70d07f..381d74c5e5 100644 --- a/volatility3/framework/plugins/windows/vadyarascan.py +++ b/volatility3/framework/plugins/windows/vadyarascan.py @@ -3,7 +3,7 @@ # import logging -from typing import Iterable, List, Tuple +from typing import Iterable, Iterator, List, NamedTuple, Tuple from volatility3.framework import interfaces, renderers from volatility3.framework.configuration import requirements @@ -14,6 +14,14 @@ vollog = logging.getLogger(__name__) +class YaraMatch(NamedTuple): + offset: int + pid: int + rule: str + match_string_identifier: str + matched_data: bytes + + class VadYaraScan(interfaces.plugins.PluginInterface): """Scans all the Virtual Address Descriptor memory maps using yara.""" @@ -33,7 +41,7 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] name="pslist", plugin=pslist.PsList, version=(2, 0, 0) ), requirements.PluginRequirement( - name="yarascan", plugin=yarascan.YaraScan, version=(2, 0, 0) + name="yarascan", plugin=yarascan.YaraScan, version=(3, 0, 0) ), requirements.ListRequirement( name="pid", @@ -49,7 +57,7 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] # return the combined requirements return yarascan_requirements + vadyarascan_requirements - def _generator(self): + def enumerate_matches(self) -> Iterator[YaraMatch]: kernel = self.context.modules[self.config["kernel"]] rules = yarascan.YaraScan.process_yara_options(dict(self.config)) @@ -79,37 +87,61 @@ def _generator(self): if yarascan.YaraScan.yara_returns_instances(): for match_string in match.strings: for instance in match_string.instances: - yield 0, ( - format_hints.Hex(instance.offset + start), + yield YaraMatch( + instance.offset + start, task.UniqueProcessId, match.rule, match_string.identifier, - instance.matched_data, + data[ + max( + instance.offset + - self.config["context_before"], + 0, + ) : instance.offset + + self.config["context_after"] + ], ) else: for offset, name, value in match.strings: - yield 0, ( - format_hints.Hex(offset + start), + yield YaraMatch( + offset + start, task.UniqueProcessId, match.rule, name, - value, + data[ + max( + offset - self.config["context_before"], 0 + ) : offset + + self.config["context_after"] + ], ) else: for match in rules.scan(data).matching_rules: for match_string in match.patterns: for instance in match_string.matches: - yield 0, ( - format_hints.Hex(instance.offset + start), + yield YaraMatch( + instance.offset + start, task.UniqueProcessId, f"{match.namespace}.{match.identifier}", match_string.identifier, data[ - instance.offset : instance.offset - + instance.length + max( + instance.offset + - self.config["context_before"], + 0, + ) : instance.offset + + self.config["context_after"] ], ) + def _generator(self): + for match in self.enumerate_matches(): + yield 0, ( + format_hints.Hex(match[0]), + *(match[1:-1]), + format_hints.HexBytes(match[-1]), + ) + @staticmethod def get_vad_maps( task: interfaces.objects.ObjectInterface, @@ -134,7 +166,7 @@ def run(self): ("PID", int), ("Rule", str), ("Component", str), - ("Value", bytes), + ("Value", format_hints.HexBytes), ], self._generator(), ) diff --git a/volatility3/framework/plugins/yarascan.py b/volatility3/framework/plugins/yarascan.py index 310bbd0725..737af01c95 100644 --- a/volatility3/framework/plugins/yarascan.py +++ b/volatility3/framework/plugins/yarascan.py @@ -37,10 +37,10 @@ class YaraScanner(interfaces.layers.ScannerInterface): - _version = (2, 1, 0) + _version = (3, 0, 0) # yara.Rules isn't exposed, so we can't type this properly - def __init__(self, rules) -> None: + def __init__(self, rules, context_before=0, context_after=32) -> None: super().__init__() if rules is None: raise ValueError("No rules provided to YaraScanner") @@ -50,6 +50,8 @@ def __init__(self, rules) -> None: if USE_YARA_X else not tuple(int(x) for x in yara.__version__.split(".")) < (4, 3) ) + self._context_before = context_before + self._context_after = context_after def __call__( self, data: bytes, data_offset: int @@ -62,7 +64,12 @@ def __call__( instance.offset + data_offset, f"{match.namespace}.{match.identifier}", match_string.identifier, - data[instance.offset : instance.offset + instance.length], + data[ + max(instance.offset - self._context_before, 0) : max( + instance.offset + instance.length, + instance.offset + self._context_after, + ) + ], ) else: for match in self._rules.match(data=data): @@ -73,11 +80,27 @@ def __call__( instance.offset + data_offset, match.rule, match_string.identifier, - instance.matched_data, + data[ + max( + instance.offset - self._context_before, 0 + ) : max( + instance.offset + len(instance.matched_data), + instance.offset + self._context_after, + ) + ], ) else: for offset, name, value in match.strings: - yield (offset + data_offset, match.rule, name, value) + yield ( + offset + data_offset, + match.rule, + name, + data[ + max(offset - self._context_before, 0) : max( + offset + self._context_after, offset + len(value) + ) + ], + ) @staticmethod def get_rule(rule): @@ -106,9 +129,12 @@ class YaraScan(plugins.PluginInterface): """Scans kernel memory using yara rules (string or file).""" _required_framework_version = (2, 0, 0) - _version = (2, 0, 0) + _version = (3, 0, 0) _yara_x = USE_YARA_X + CONTEXT_BEFORE_DEFAULT = 0 + CONTEXT_AFTER_DEFAULT = 32 + @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: """Returns the requirements needed to run yarascan directly, combining the TranslationLayerRequirement @@ -164,6 +190,19 @@ def get_yarascan_option_requirements( description="Set the maximum size (default is 1GB)", optional=True, ), + requirements.IntRequirement( + name="context_before", + description="Number of bytes of context to display before start of match", + default=cls.CONTEXT_BEFORE_DEFAULT, + optional=True, + ), + requirements.IntRequirement( + name="context_after", + description="Number of bytes of context to display after start of match. " + "If the size of the match exceeds this value, the full match is shown", + default=cls.CONTEXT_AFTER_DEFAULT, + optional=True, + ), ] @classmethod @@ -199,9 +238,17 @@ def _generator(self): layer = self.context.layers[self.config["primary"]] for offset, rule_name, name, value in layer.scan( - context=self.context, scanner=YaraScanner(rules=rules) + context=self.context, + scanner=YaraScanner( + rules, self.config["context_before"], self.config["context_after"] + ), ): - yield 0, (format_hints.Hex(offset), rule_name, name, value) + yield 0, ( + format_hints.Hex(offset), + rule_name, + name, + format_hints.HexBytes(value), + ) def run(self): return renderers.TreeGrid( @@ -209,7 +256,7 @@ def run(self): ("Offset", format_hints.Hex), ("Rule", str), ("Component", str), - ("Value", bytes), + ("Value", format_hints.HexBytes), ], self._generator(), )