diff --git a/volatility3/cli/__init__.py b/volatility3/cli/__init__.py index 1209d7cdc7..f93b25d3d3 100644 --- a/volatility3/cli/__init__.py +++ b/volatility3/cli/__init__.py @@ -19,6 +19,8 @@ import sys import tempfile import traceback +import hashlib +import lzma from typing import Any, Dict, List, Tuple, Type, Union from urllib import parse, request @@ -45,6 +47,7 @@ ) from volatility3.framework.automagic import stacker from volatility3.framework.configuration import requirements +from volatility3.framework.interfaces.configuration import path_join # Make sure we log everything @@ -247,7 +250,12 @@ def run(self): default=[], action="append", ) - + parser.add_argument( + "--virtmap-cache-path", + help="Path to the virtmap cache file, typically produced by the virtmapscanner plugin.", + default=None, + type=str, + ) parser.set_defaults(**default_config) # We have to filter out help, otherwise parse_known_args will trigger the help message before having @@ -398,6 +406,49 @@ def run(self): plugin_config_path, interfaces.configuration.HierarchicalDict(json_val), ) + if args.virtmap_cache_path: + with open(args.virtmap_cache_path, "rb") as f: + virtmap_cache_content = f.read() + + virtmap_metadata_filename = os.path.join( + constants.CACHE_PATH, + "data_" + hashlib.sha512(virtmap_cache_content).hexdigest() + ".cache", + ) + if os.path.exists(virtmap_metadata_filename): + with open(virtmap_metadata_filename, "r") as f: + map_metadata = json.loads(f.read()) + layers_identifiers = map_metadata["layers_identifiers"] + sections_per_layer = map_metadata["sections_per_layer"] + else: + vollog.debug("Saving virtmap cache file metadata to Volatility3 cache") + raw_json = lzma.decompress(virtmap_cache_content) + json_val: dict = json.loads(raw_json) + layers_identifiers = list(json_val.keys()) + + sections_per_layer = {} + for layer_identifier, sections in json_val.items(): + sections_per_layer[layer_identifier] = list(sections.keys()) + + # Save metadata in the Vol3 cache, to avoid the costly + # decompression and deserialization process on each run. + with open(virtmap_metadata_filename, "w+") as f: + json.dump( + { + "layers_identifiers": list(json_val.keys()), + "sections_per_layer": sections_per_layer, + }, + f, + ) + + ctx.config[path_join("virtmap_cache", "filepath")] = args.virtmap_cache_path + ctx.config[path_join("virtmap_cache", "layers_identifiers")] = ( + layers_identifiers + ) + ctx.config.splice( + path_join("virtmap_cache", "sections_per_layer"), + interfaces.configuration.HierarchicalDict(sections_per_layer), + ) + vollog.log(constants.LOGLEVEL_VV, "Successfully loaded virtmap cache file") # It should be up to the UI to determine which automagics to run, so this is before BACK TO THE FRAMEWORK automagics = automagic.choose_automagic(automagics, plugin) @@ -451,7 +502,7 @@ def run(self): ) args.save_config = "config.json" if args.save_config: - vollog.debug("Writing out configuration data to {args.save_config}") + vollog.debug(f"Writing out configuration data to {args.save_config}") if os.path.exists(os.path.abspath(args.save_config)): parser.error( f"Cannot write configuration: file {args.save_config} already exists" diff --git a/volatility3/framework/interfaces/layers.py b/volatility3/framework/interfaces/layers.py index e2a68780a7..ce57cfc4e1 100644 --- a/volatility3/framework/interfaces/layers.py +++ b/volatility3/framework/interfaces/layers.py @@ -15,10 +15,13 @@ import threading import traceback import types +import lzma +import json from abc import ABCMeta, abstractmethod from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Union from volatility3.framework import constants, exceptions, interfaces +from volatility3.framework.interfaces.configuration import path_join vollog = logging.getLogger(__name__) @@ -468,6 +471,49 @@ def _encode_data( # ## Read/Write functions for mapped pages + def _access_virtmap_cache(self, section: Tuple[int, int]) -> Optional[list]: + """Checks and loads the virtmap cache. + + The virtmap cache corresponds to a previous _scan_iterator + output, typically loaded from a file. + Args: + sections: sections (start, size) to retrieve from the cache + Returns: + A list containing mappings for a specific section of this layer""" + + # Check if layer is fully constructed first + if self.context.config.get( + path_join("virtmap_cache", "filepath") + ) and self.config.get("class"): + filepath = self.context.config[path_join("virtmap_cache", "filepath")] + layer_identifier = path_join(self.config["class"], self.name) + layers_identifiers = self.context.config[ + path_join("virtmap_cache", "layers_identifiers") + ] + # Exact section match only, even if a requested section would *fit* + # inside one available in the cache. + if ( + layer_identifier in layers_identifiers + and str(section) + in self.context.config[ + path_join("virtmap_cache", "sections_per_layer", layer_identifier) + ] + ): + # Avoid decompressing and deserializing the file + # more than once. Saves time, but costs more RAM. + if not hasattr(self, "_virtmap_cache_dict"): + with open(filepath, "rb") as f: + raw_json = lzma.decompress(f.read()) + # Can be sped up via the orjson library + self._virtmap_cache_dict = json.loads(raw_json) + + vollog.log( + constants.LOGLEVEL_VVV, + f'Applying virtmap cache to section "{section}" of layer "{layer_identifier}"', + ) + return self._virtmap_cache_dict[layer_identifier][str(section)] + return None + @functools.lru_cache(maxsize=512) def read(self, offset: int, length: int, pad: bool = False) -> bytes: """Reads an offset for length bytes and returns 'bytes' (not 'str') of @@ -551,6 +597,12 @@ def _scan_iterator( assumed to have no holes """ for section_start, section_length in sections: + # Check the virtmap cache and use it if available + cache = self._access_virtmap_cache((section_start, section_length)) + if cache: + for map in cache: + yield map + continue output: List[Tuple[str, int, int]] = [] # Hold the offsets of each chunk (including how much has been filled) diff --git a/volatility3/framework/plugins/windows/virtmapscanner.py b/volatility3/framework/plugins/windows/virtmapscanner.py new file mode 100644 index 0000000000..9961f1a269 --- /dev/null +++ b/volatility3/framework/plugins/windows/virtmapscanner.py @@ -0,0 +1,201 @@ +# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0 +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 +# + +import logging +import functools +import json +import lzma +import traceback + +from typing import Iterable, Type, Tuple, Dict +from volatility3.framework import renderers, interfaces, constants, exceptions +from volatility3.framework.configuration import requirements +from volatility3.framework.layers.scanners import BytesScanner +from volatility3.framework.interfaces.configuration import path_join +from volatility3.plugins.windows import pslist + +vollog = logging.getLogger(__name__) + + +class VirtMapScanner(interfaces.plugins.PluginInterface): + """Scans the entire kernel virtual memory space by default and dumps its content to the disk. Allows to speed-up mapping operations afterwards, by specifying the output file as an argument to --virtmap-cache-path.""" + + _required_framework_version = (2, 0, 0) + _version = (1, 0, 0) + + @classmethod + def get_requirements(cls): + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.PluginRequirement( + name="pslist", plugin=pslist.PsList, version=(2, 0, 0) + ), + requirements.BooleanRequirement( + name="scan-processes", + description="Scan each process address space", + default=False, + optional=True, + ), + ] + + @classmethod + def virtmap_cache_file_producer( + cls, + scans_results: dict, + open_method: Type[interfaces.plugins.FileHandlerInterface], + filename: str = "virtmapcache.json.xz", + ) -> str: + """Dumps scanning results into a JSON string, + compresses it and writes it to disk. + + Args: + scans_results: the layers scans results + open_method: class to provide context manager for opening the file + filename: the filename to use when dumping the file to disk + + Returns: + A dictionary mapping each section to the section scan result + """ + + file_handle = open_method(filename) + json_data = json.dumps(scans_results).encode() + xz_data = lzma.compress(json_data) + file_handle.write(xz_data) + file_handle.close() + + return file_handle.preferred_filename + + @classmethod + def virtmap_cache_scanner( + cls, + layer: interfaces.layers.DataLayerInterface, + sections: Iterable[Tuple[int, int]], + progress_callback: constants.ProgressCallback = None, + ) -> dict: + """Scans the provided layer sections + + Args: + layer: the layer to scan + sections: the sections to scan on the layer + + Returns: + A dictionary mapping each section to the section scan result + """ + layer_results = {} + scanner = BytesScanner("") + for section in sections: + scan_iterator = functools.partial(layer._scan_iterator, scanner, [section]) + scan_metric = layer._scan_metric(scanner, [section]) + scan_values = [] + try: + for value in scan_iterator(): + scan_values.append(value) + if progress_callback: + progress_callback( + scan_metric(value[1]), + f"Scanning {layer.name} using {scanner.__class__.__name__}", + ) + except Exception as e: + vollog.debug(f"Scan Failure: {str(e)}") + vollog.log( + constants.LOGLEVEL_VVV, + "\n".join( + traceback.TracebackException.from_exception(e).format( + chain=True + ) + ), + ) + + layer_results[str(section)] = scan_values + + return layer_results + + @classmethod + def virtmap_cache_producer( + cls, + layers_sections: Dict[ + interfaces.layers.DataLayerInterface, Iterable[Tuple[int, int]] + ], + progress_callback: constants.ProgressCallback = None, + ) -> dict: + """Scans a list of layers and sections + + Args: + layers_sections: a dictionary containing layers and a list of sections to scan on each layer + + Returns: + A dictionary mapping each layer identifier to the corresponding scan result + """ + layers_results = {} + + for layer, sections in layers_sections.items(): + layer_results = cls.virtmap_cache_scanner( + layer, sections, progress_callback + ) + # Clearly identify this layer, by concatenating the layer class and the layer name + layer_identifier = path_join(layer.config["class"], layer.name) + layers_results[layer_identifier] = layer_results + + return layers_results + + def _generator(self): + kernel = self.context.modules[self.config["kernel"]] + kernel_layer = self.context.layers[kernel.layer_name] + layers_sections = {} + layers_sections[kernel_layer] = [ + ( + kernel_layer.minimum_address, + kernel_layer.maximum_address - kernel_layer.minimum_address, + ) + ] + if self.config["scan-processes"]: + for proc in pslist.PsList.list_processes( + context=self.context, + layer_name=kernel.layer_name, + symbol_table=kernel.symbol_table_name, + ): + proc_id = "Unknown" + try: + proc_id = proc.UniqueProcessId + proc_layer_name = proc.add_process_layer() + except exceptions.InvalidAddressException as excp: + vollog.debug( + "Process {}: invalid address {} in layer {}".format( + proc_id, excp.invalid_address, excp.layer_name + ) + ) + continue + + proc_layer = self.context.layers[proc_layer_name] + layers_sections[proc_layer] = [ + ( + proc_layer.minimum_address, + proc_layer.maximum_address - proc_layer.minimum_address, + ) + ] + + layers_results = self.virtmap_cache_producer( + layers_sections, self._progress_callback + ) + virtmapcache_filename = self.virtmap_cache_file_producer( + layers_results, self.open + ) + + res = ( + 0, + (virtmapcache_filename,), + ) + yield res + + def run(self): + return renderers.TreeGrid( + [ + ("Virtual mappings cache file output", str), + ], + self._generator(), + )