diff --git a/volatility3/framework/plugins/windows/vadyarascan.py b/volatility3/framework/plugins/windows/vadyarascan.py index efcc70d07f..381d74c5e5 100644 --- a/volatility3/framework/plugins/windows/vadyarascan.py +++ b/volatility3/framework/plugins/windows/vadyarascan.py @@ -3,7 +3,7 @@ # import logging -from typing import Iterable, List, Tuple +from typing import Iterable, Iterator, List, NamedTuple, Tuple from volatility3.framework import interfaces, renderers from volatility3.framework.configuration import requirements @@ -14,6 +14,14 @@ vollog = logging.getLogger(__name__) +class YaraMatch(NamedTuple): + offset: int + pid: int + rule: str + match_string_identifier: str + matched_data: bytes + + class VadYaraScan(interfaces.plugins.PluginInterface): """Scans all the Virtual Address Descriptor memory maps using yara.""" @@ -33,7 +41,7 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] name="pslist", plugin=pslist.PsList, version=(2, 0, 0) ), requirements.PluginRequirement( - name="yarascan", plugin=yarascan.YaraScan, version=(2, 0, 0) + name="yarascan", plugin=yarascan.YaraScan, version=(3, 0, 0) ), requirements.ListRequirement( name="pid", @@ -49,7 +57,7 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] # return the combined requirements return yarascan_requirements + vadyarascan_requirements - def _generator(self): + def enumerate_matches(self) -> Iterator[YaraMatch]: kernel = self.context.modules[self.config["kernel"]] rules = yarascan.YaraScan.process_yara_options(dict(self.config)) @@ -79,37 +87,61 @@ def _generator(self): if yarascan.YaraScan.yara_returns_instances(): for match_string in match.strings: for instance in match_string.instances: - yield 0, ( - format_hints.Hex(instance.offset + start), + yield YaraMatch( + instance.offset + start, task.UniqueProcessId, match.rule, match_string.identifier, - instance.matched_data, + data[ + max( + instance.offset + - self.config["context_before"], + 0, + ) : instance.offset + + self.config["context_after"] + ], ) else: for offset, name, value in match.strings: - yield 0, ( - format_hints.Hex(offset + start), + yield YaraMatch( + offset + start, task.UniqueProcessId, match.rule, name, - value, + data[ + max( + offset - self.config["context_before"], 0 + ) : offset + + self.config["context_after"] + ], ) else: for match in rules.scan(data).matching_rules: for match_string in match.patterns: for instance in match_string.matches: - yield 0, ( - format_hints.Hex(instance.offset + start), + yield YaraMatch( + instance.offset + start, task.UniqueProcessId, f"{match.namespace}.{match.identifier}", match_string.identifier, data[ - instance.offset : instance.offset - + instance.length + max( + instance.offset + - self.config["context_before"], + 0, + ) : instance.offset + + self.config["context_after"] ], ) + def _generator(self): + for match in self.enumerate_matches(): + yield 0, ( + format_hints.Hex(match[0]), + *(match[1:-1]), + format_hints.HexBytes(match[-1]), + ) + @staticmethod def get_vad_maps( task: interfaces.objects.ObjectInterface, @@ -134,7 +166,7 @@ def run(self): ("PID", int), ("Rule", str), ("Component", str), - ("Value", bytes), + ("Value", format_hints.HexBytes), ], self._generator(), ) diff --git a/volatility3/framework/plugins/yarascan.py b/volatility3/framework/plugins/yarascan.py index 310bbd0725..737af01c95 100644 --- a/volatility3/framework/plugins/yarascan.py +++ b/volatility3/framework/plugins/yarascan.py @@ -37,10 +37,10 @@ class YaraScanner(interfaces.layers.ScannerInterface): - _version = (2, 1, 0) + _version = (3, 0, 0) # yara.Rules isn't exposed, so we can't type this properly - def __init__(self, rules) -> None: + def __init__(self, rules, context_before=0, context_after=32) -> None: super().__init__() if rules is None: raise ValueError("No rules provided to YaraScanner") @@ -50,6 +50,8 @@ def __init__(self, rules) -> None: if USE_YARA_X else not tuple(int(x) for x in yara.__version__.split(".")) < (4, 3) ) + self._context_before = context_before + self._context_after = context_after def __call__( self, data: bytes, data_offset: int @@ -62,7 +64,12 @@ def __call__( instance.offset + data_offset, f"{match.namespace}.{match.identifier}", match_string.identifier, - data[instance.offset : instance.offset + instance.length], + data[ + max(instance.offset - self._context_before, 0) : max( + instance.offset + instance.length, + instance.offset + self._context_after, + ) + ], ) else: for match in self._rules.match(data=data): @@ -73,11 +80,27 @@ def __call__( instance.offset + data_offset, match.rule, match_string.identifier, - instance.matched_data, + data[ + max( + instance.offset - self._context_before, 0 + ) : max( + instance.offset + len(instance.matched_data), + instance.offset + self._context_after, + ) + ], ) else: for offset, name, value in match.strings: - yield (offset + data_offset, match.rule, name, value) + yield ( + offset + data_offset, + match.rule, + name, + data[ + max(offset - self._context_before, 0) : max( + offset + self._context_after, offset + len(value) + ) + ], + ) @staticmethod def get_rule(rule): @@ -106,9 +129,12 @@ class YaraScan(plugins.PluginInterface): """Scans kernel memory using yara rules (string or file).""" _required_framework_version = (2, 0, 0) - _version = (2, 0, 0) + _version = (3, 0, 0) _yara_x = USE_YARA_X + CONTEXT_BEFORE_DEFAULT = 0 + CONTEXT_AFTER_DEFAULT = 32 + @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: """Returns the requirements needed to run yarascan directly, combining the TranslationLayerRequirement @@ -164,6 +190,19 @@ def get_yarascan_option_requirements( description="Set the maximum size (default is 1GB)", optional=True, ), + requirements.IntRequirement( + name="context_before", + description="Number of bytes of context to display before start of match", + default=cls.CONTEXT_BEFORE_DEFAULT, + optional=True, + ), + requirements.IntRequirement( + name="context_after", + description="Number of bytes of context to display after start of match. " + "If the size of the match exceeds this value, the full match is shown", + default=cls.CONTEXT_AFTER_DEFAULT, + optional=True, + ), ] @classmethod @@ -199,9 +238,17 @@ def _generator(self): layer = self.context.layers[self.config["primary"]] for offset, rule_name, name, value in layer.scan( - context=self.context, scanner=YaraScanner(rules=rules) + context=self.context, + scanner=YaraScanner( + rules, self.config["context_before"], self.config["context_after"] + ), ): - yield 0, (format_hints.Hex(offset), rule_name, name, value) + yield 0, ( + format_hints.Hex(offset), + rule_name, + name, + format_hints.HexBytes(value), + ) def run(self): return renderers.TreeGrid( @@ -209,7 +256,7 @@ def run(self): ("Offset", format_hints.Hex), ("Rule", str), ("Component", str), - ("Value", bytes), + ("Value", format_hints.HexBytes), ], self._generator(), )