Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core, plugin] : Virtual mappings dumping and caching #1237

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 53 additions & 2 deletions volatility3/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import sys
import tempfile
import traceback
import hashlib
import lzma
from typing import Any, Dict, List, Tuple, Type, Union
from urllib import parse, request

Expand All @@ -45,6 +47,7 @@
)
from volatility3.framework.automagic import stacker
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces.configuration import path_join

# Make sure we log everything

Expand Down Expand Up @@ -247,7 +250,12 @@ def run(self):
default=[],
action="append",
)

parser.add_argument(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not yet comfortable with this machinery. It seems really bodged into the workings. 5:S

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, that's not a helpful comment. It's just a feeling I get but it doesn't feel like it's been smoothly integrated into how everything works, but rather has lots of external moving bits (like files the user has to pass in, which get stashed in a non-unique place in the config to get used where they're needed. As I say, I think there may be a better way that evades both the file issues, improves on the uniqueness and wouldn't require user interaction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of this cache might silently fill the filesystem without noticing the user, although the generated file is xz compressed. Indeed, everything could be stored in the cache, but then this would imply that --clear-cache command might also remove it, when trying to fix a symbol cache problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unintentionally filling the disk is a potential issue, but we can build cache checks on runs to clear out old files if it becomes a problem. What's the expected filesize for one of these xz files once it's made?

We'd want --clear-cache to do exactly that, it's supposed to set you back to square 1, so you can rule the cache out as a contributing factor to a problem.

"--virtmap-cache-path",
help="Path to the virtmap cache file, typically produced by the virtmapscanner plugin.",
default=None,
type=str,
)
parser.set_defaults(**default_config)

# We have to filter out help, otherwise parse_known_args will trigger the help message before having
Expand Down Expand Up @@ -398,6 +406,49 @@ def run(self):
plugin_config_path,
interfaces.configuration.HierarchicalDict(json_val),
)
if args.virtmap_cache_path:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just not so keen adding this functionality this way. What happens if we discovered we've missed something in the file format, or there's a more efficient way of storing the data, or something else that crops up. The more we expose this to the user and make them do work to get it, the more can go wrong. There's a comment further down that suggests a different way of working the cache that might solve a bunch of problems?

Copy link
Contributor Author

@Abyss-W4tcher Abyss-W4tcher Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, however both situation (manual / automatic) might produce different issues. With a manual implementation, at least it is easy to remove the argument and see if it was the problem (although it hopefully shouldn't, but I understand the concern).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but it'll still be bug reports we're trying to diagnose it from, so recommending --clear-cache seems as straightforward.

with open(args.virtmap_cache_path, "rb") as f:
virtmap_cache_content = f.read()

virtmap_metadata_filename = os.path.join(
constants.CACHE_PATH,
"data_" + hashlib.sha512(virtmap_cache_content).hexdigest() + ".cache",
)
if os.path.exists(virtmap_metadata_filename):
with open(virtmap_metadata_filename, "r") as f:
map_metadata = json.loads(f.read())
layers_identifiers = map_metadata["layers_identifiers"]
sections_per_layer = map_metadata["sections_per_layer"]
else:
vollog.debug("Saving virtmap cache file metadata to Volatility3 cache")
raw_json = lzma.decompress(virtmap_cache_content)
json_val: dict = json.loads(raw_json)
layers_identifiers = list(json_val.keys())

sections_per_layer = {}
for layer_identifier, sections in json_val.items():
sections_per_layer[layer_identifier] = list(sections.keys())

# Save metadata in the Vol3 cache, to avoid the costly
# decompression and deserialization process on each run.
with open(virtmap_metadata_filename, "w+") as f:
json.dump(
{
"layers_identifiers": list(json_val.keys()),
"sections_per_layer": sections_per_layer,
},
f,
)

ctx.config[path_join("virtmap_cache", "filepath")] = args.virtmap_cache_path
ctx.config[path_join("virtmap_cache", "layers_identifiers")] = (
layers_identifiers
)
ctx.config.splice(
path_join("virtmap_cache", "sections_per_layer"),
interfaces.configuration.HierarchicalDict(sections_per_layer),
)
vollog.log(constants.LOGLEVEL_VV, "Successfully loaded virtmap cache file")

# It should be up to the UI to determine which automagics to run, so this is before BACK TO THE FRAMEWORK
automagics = automagic.choose_automagic(automagics, plugin)
Expand Down Expand Up @@ -451,7 +502,7 @@ def run(self):
)
args.save_config = "config.json"
if args.save_config:
vollog.debug("Writing out configuration data to {args.save_config}")
vollog.debug(f"Writing out configuration data to {args.save_config}")
if os.path.exists(os.path.abspath(args.save_config)):
parser.error(
f"Cannot write configuration: file {args.save_config} already exists"
Expand Down
52 changes: 52 additions & 0 deletions volatility3/framework/interfaces/layers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
import threading
import traceback
import types
import lzma
import json
from abc import ABCMeta, abstractmethod
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Union

from volatility3.framework import constants, exceptions, interfaces
from volatility3.framework.interfaces.configuration import path_join

vollog = logging.getLogger(__name__)

Expand Down Expand Up @@ -468,6 +471,49 @@ def _encode_data(

# ## Read/Write functions for mapped pages

def _access_virtmap_cache(self, section: Tuple[int, int]) -> Optional[list]:
"""Checks and loads the virtmap cache.

The virtmap cache corresponds to a previous _scan_iterator
output, typically loaded from a file.
Args:
sections: sections (start, size) to retrieve from the cache
Returns:
A list containing mappings for a specific section of this layer"""

# Check if layer is fully constructed first
if self.context.config.get(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What object has the requirement on this? It should be that the layer that uses it has an optional requirement on it, so that it'll get saved into any config files that get constructed. The layer identifier isn't even close to unique (almost all plugins use the same layer name, and class) so this will go badly when you have multiple layers you want to use this on (or a config you want to work for multiple images).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The layer_identifier should be unique ? Example layer_identifiers :

  • volatility3.framework.layers.intel.WindowsIntel32e.layer_name -> identifies the kernel layer (TranslationLayerRequirement name)
  • volatility3.framework.layers.intel.WindowsIntel32e.layer_name_Process5948 -> identifies the process 5948

I might have missed something, but it shouldn't be possible to have a duplicate layer string identifier in the layers pool ?

This specific "config"/"cache" is intended to be used for a unique memory capture, as even a dump from the same kernel a few seconds later would have different mappings.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Layer name is unique for a run of volatility, but they'll likely all say primary1 or memory_layer1 or something. Across runs they're unlikely to even be different.

The process layers, similarly, won't be different across different images that have processes with the same pid... I don't think a dump from a few seconds later would have a different cache? The layer name would likely be the same, and many of the process layer names would too, but also, it could match a wildly different image...

path_join("virtmap_cache", "filepath")
) and self.config.get("class"):
filepath = self.context.config[path_join("virtmap_cache", "filepath")]
layer_identifier = path_join(self.config["class"], self.name)
layers_identifiers = self.context.config[
path_join("virtmap_cache", "layers_identifiers")
]
# Exact section match only, even if a requested section would *fit*
# inside one available in the cache.
if (
layer_identifier in layers_identifiers
and str(section)
in self.context.config[
path_join("virtmap_cache", "sections_per_layer", layer_identifier)
]
):
# Avoid decompressing and deserializing the file
# more than once. Saves time, but costs more RAM.
if not hasattr(self, "_virtmap_cache_dict"):
with open(filepath, "rb") as f:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, file handling is tricky. If this is running as a web interface on a remote server, standard open operations won't work. This should be a constructed ResourceAccessor class's open method so that it can open URL and compressed files automatically. It will then accept a URL rather than a file path.

raw_json = lzma.decompress(f.read())
# Can be sped up via the orjson library
self._virtmap_cache_dict = json.loads(raw_json)

vollog.log(
constants.LOGLEVEL_VVV,
f'Applying virtmap cache to section "{section}" of layer "{layer_identifier}"',
)
return self._virtmap_cache_dict[layer_identifier][str(section)]
return None

@functools.lru_cache(maxsize=512)
def read(self, offset: int, length: int, pad: bool = False) -> bytes:
"""Reads an offset for length bytes and returns 'bytes' (not 'str') of
Expand Down Expand Up @@ -551,6 +597,12 @@ def _scan_iterator(
assumed to have no holes
"""
for section_start, section_length in sections:
# Check the virtmap cache and use it if available
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is essentially going to generate it, why don't we... try to look it up in a cache and if it fails, do the reset of the code and then store those results into the cache? Rather than a completely separate plugin, that speeds up everything without costing any additional time? It also means the data can be stored in the local on-disk cache (like processed symbol file) which saves us having to worry about the user getting the data into the system, or messing with the UI at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generating it here would fit if an automatic mapping cache generation is implemented. This would result in filling the cache and adding processing time without the user requesting it ?

An additional plugin, also allows to unify a single method to save the cache, instead of it being randomly generated depending on the plugin first ran. As said, it is not a "ground-breaking" feature, from which the explicit core integration might come with concerns for users (this PR doesn't really add much more core logic, if the feature isn't turned on).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, isn't that exactly what this is generating here? Why not just save it rather than generating it in a different plugin as part of a separate step? This could also build up the cache as needed, so if parts haven't been computed yet, it does the full task, but if they are it uses the cache?

Implementing a cache (that might be wrong unless it's matched to the image very carefully) already seems like quite a deep-rooted feature to me...

cache = self._access_virtmap_cache((section_start, section_length))
if cache:
for map in cache:
yield map
continue
output: List[Tuple[str, int, int]] = []

# Hold the offsets of each chunk (including how much has been filled)
Expand Down
201 changes: 201 additions & 0 deletions volatility3/framework/plugins/windows/virtmapscanner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import logging
import functools
import json
import lzma
import traceback

from typing import Iterable, Type, Tuple, Dict
from volatility3.framework import renderers, interfaces, constants, exceptions
from volatility3.framework.configuration import requirements
from volatility3.framework.layers.scanners import BytesScanner
from volatility3.framework.interfaces.configuration import path_join
from volatility3.plugins.windows import pslist

vollog = logging.getLogger(__name__)


class VirtMapScanner(interfaces.plugins.PluginInterface):
"""Scans the entire kernel virtual memory space by default and dumps its content to the disk. Allows to speed-up mapping operations afterwards, by specifying the output file as an argument to --virtmap-cache-path."""

_required_framework_version = (2, 0, 0)
_version = (1, 0, 0)
ikelos marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def get_requirements(cls):
return [
requirements.ModuleRequirement(
name="kernel",
description="Windows kernel",
architectures=["Intel32", "Intel64"],
),
requirements.PluginRequirement(
name="pslist", plugin=pslist.PsList, version=(2, 0, 0)
),
requirements.BooleanRequirement(
name="scan-processes",
description="Scan each process address space",
default=False,
optional=True,
),
]

@classmethod
def virtmap_cache_file_producer(
cls,
scans_results: dict,
open_method: Type[interfaces.plugins.FileHandlerInterface],
filename: str = "virtmapcache.json.xz",
) -> str:
"""Dumps scanning results into a JSON string,
compresses it and writes it to disk.

Args:
scans_results: the layers scans results
open_method: class to provide context manager for opening the file
filename: the filename to use when dumping the file to disk

Returns:
A dictionary mapping each section to the section scan result
"""

file_handle = open_method(filename)
json_data = json.dumps(scans_results).encode()
xz_data = lzma.compress(json_data)
file_handle.write(xz_data)
file_handle.close()

return file_handle.preferred_filename

@classmethod
def virtmap_cache_scanner(
cls,
layer: interfaces.layers.DataLayerInterface,
sections: Iterable[Tuple[int, int]],
progress_callback: constants.ProgressCallback = None,
) -> dict:
"""Scans the provided layer sections

Args:
layer: the layer to scan
sections: the sections to scan on the layer

Returns:
A dictionary mapping each section to the section scan result
"""
layer_results = {}
scanner = BytesScanner("")
for section in sections:
scan_iterator = functools.partial(layer._scan_iterator, scanner, [section])
scan_metric = layer._scan_metric(scanner, [section])
scan_values = []
try:
for value in scan_iterator():
scan_values.append(value)
if progress_callback:
progress_callback(
scan_metric(value[1]),
f"Scanning {layer.name} using {scanner.__class__.__name__}",
)
except Exception as e:
vollog.debug(f"Scan Failure: {str(e)}")
vollog.log(
constants.LOGLEVEL_VVV,
"\n".join(
traceback.TracebackException.from_exception(e).format(
chain=True
)
),
)

layer_results[str(section)] = scan_values

return layer_results

@classmethod
def virtmap_cache_producer(
cls,
layers_sections: Dict[
interfaces.layers.DataLayerInterface, Iterable[Tuple[int, int]]
],
progress_callback: constants.ProgressCallback = None,
) -> dict:
"""Scans a list of layers and sections

Args:
layers_sections: a dictionary containing layers and a list of sections to scan on each layer

Returns:
A dictionary mapping each layer identifier to the corresponding scan result
"""
layers_results = {}

for layer, sections in layers_sections.items():
layer_results = cls.virtmap_cache_scanner(
layer, sections, progress_callback
)
# Clearly identify this layer, by concatenating the layer class and the layer name
layer_identifier = path_join(layer.config["class"], layer.name)
layers_results[layer_identifier] = layer_results

return layers_results

def _generator(self):
kernel = self.context.modules[self.config["kernel"]]
kernel_layer = self.context.layers[kernel.layer_name]
layers_sections = {}
layers_sections[kernel_layer] = [
(
kernel_layer.minimum_address,
kernel_layer.maximum_address - kernel_layer.minimum_address,
)
]
if self.config["scan-processes"]:
for proc in pslist.PsList.list_processes(
context=self.context,
layer_name=kernel.layer_name,
symbol_table=kernel.symbol_table_name,
):
proc_id = "Unknown"
try:
proc_id = proc.UniqueProcessId
proc_layer_name = proc.add_process_layer()
except exceptions.InvalidAddressException as excp:
vollog.debug(
"Process {}: invalid address {} in layer {}".format(
proc_id, excp.invalid_address, excp.layer_name
)
)
continue

proc_layer = self.context.layers[proc_layer_name]
layers_sections[proc_layer] = [
(
proc_layer.minimum_address,
proc_layer.maximum_address - proc_layer.minimum_address,
)
]

layers_results = self.virtmap_cache_producer(
layers_sections, self._progress_callback
)
virtmapcache_filename = self.virtmap_cache_file_producer(
layers_results, self.open
)

res = (
0,
(virtmapcache_filename,),
)
yield res

def run(self):
return renderers.TreeGrid(
[
("Virtual mappings cache file output", str),
],
self._generator(),
)
Loading