diff --git a/volatility3/framework/constants/__init__.py b/volatility3/framework/constants/__init__.py index 8743a64b0b..27fae4ba1f 100644 --- a/volatility3/framework/constants/__init__.py +++ b/volatility3/framework/constants/__init__.py @@ -134,4 +134,5 @@ def __getattr__(name): ]: warnings.warn(f"{name} is deprecated", FutureWarning) return globals()[f"{deprecated_tag}{name}"] - return None + + return getattr(__import__(__name__), name) diff --git a/volatility3/framework/interfaces/configuration.py b/volatility3/framework/interfaces/configuration.py index 3bb3cb0193..da0a4556c6 100644 --- a/volatility3/framework/interfaces/configuration.py +++ b/volatility3/framework/interfaces/configuration.py @@ -494,8 +494,7 @@ def unsatisfied( """Validates the instance requirement based upon its `instance_type`.""" config_path = path_join(config_path, self.name) - - value = self.config_value(context, config_path, None) + value = self.config_value(context, config_path, self.default) if not isinstance(value, self.instance_type): vollog.log( constants.LOGLEVEL_V, @@ -536,7 +535,7 @@ def unsatisfied( """Checks to see if a class can be recovered.""" config_path = path_join(config_path, self.name) - value = self.config_value(context, config_path, None) + value = self.config_value(context, config_path, self.default) self._cls = None if value is not None and isinstance(value, str): if "." in value: diff --git a/volatility3/framework/plugins/linux/check_creds.py b/volatility3/framework/plugins/linux/check_creds.py index ab6ee4935c..b7f73c3eb0 100644 --- a/volatility3/framework/plugins/linux/check_creds.py +++ b/volatility3/framework/plugins/linux/check_creds.py @@ -2,20 +2,19 @@ # which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 # -import logging - from volatility3.framework import interfaces, renderers +from volatility3.framework.renderers import format_hints from volatility3.framework.configuration import requirements from volatility3.plugins.linux import pslist -vollog = logging.getLogger(__name__) - class Check_creds(interfaces.plugins.PluginInterface): """Checks if any processes are sharing credential structures""" _required_framework_version = (2, 0, 0) + _version = (2, 0, 0) + @classmethod def get_requirements(cls): return [ @@ -46,20 +45,28 @@ def _generator(self): tasks = pslist.PsList.list_tasks(self.context, vmlinux.name) for task in tasks: - cred_addr = task.cred.dereference().vol.offset + task_cred_ptr = task.cred + if not (task_cred_ptr and task_cred_ptr.is_readable()): + continue - if cred_addr not in creds: - creds[cred_addr] = [] + cred_addr = task_cred_ptr.dereference().vol.offset + creds.setdefault(cred_addr, []) creds[cred_addr].append(task.pid) - for _, pids in creds.items(): + for cred_addr, pids in creds.items(): if len(pids) > 1: - pid_str = "" - for pid in pids: - pid_str = pid_str + f"{pid:d}, " - pid_str = pid_str[:-2] - yield (0, [str(pid_str)]) + pid_str = ", ".join([str(pid) for pid in pids]) + + fields = [ + format_hints.Hex(cred_addr), + pid_str, + ] + yield (0, fields) def run(self): - return renderers.TreeGrid([("PIDs", str)], self._generator()) + headers = [ + ("CredVAddr", format_hints.Hex), + ("PIDs", str), + ] + return renderers.TreeGrid(headers, self._generator()) diff --git a/volatility3/framework/plugins/linux/malfind.py b/volatility3/framework/plugins/linux/malfind.py index cf06ee0ccb..18f3dcd56b 100644 --- a/volatility3/framework/plugins/linux/malfind.py +++ b/volatility3/framework/plugins/linux/malfind.py @@ -5,7 +5,7 @@ from typing import List import logging from volatility3.framework import constants, interfaces -from volatility3.framework import renderers +from volatility3.framework import renderers, symbols from volatility3.framework.configuration import requirements from volatility3.framework.objects import utility from volatility3.framework.renderers import format_hints @@ -63,15 +63,9 @@ def _list_injections(self, task): def _generator(self, tasks): # determine if we're on a 32 or 64 bit kernel vmlinux = self.context.modules[self.config["kernel"]] - if ( - self.context.symbol_space.get_type( - vmlinux.symbol_table_name + constants.BANG + "pointer" - ).size - == 4 - ): - is_32bit_arch = True - else: - is_32bit_arch = False + is_32bit_arch = not symbols.symbol_table_is_64bit( + self.context, vmlinux.symbol_table_name + ) for task in tasks: process_name = utility.array_to_string(task.comm) diff --git a/volatility3/framework/plugins/linux/pidhashtable.py b/volatility3/framework/plugins/linux/pidhashtable.py index 3223aed4a3..edafe97e05 100644 --- a/volatility3/framework/plugins/linux/pidhashtable.py +++ b/volatility3/framework/plugins/linux/pidhashtable.py @@ -20,7 +20,7 @@ class PIDHashTable(plugins.PluginInterface): _required_framework_version = (2, 0, 0) - _version = (1, 0, 0) + _version = (1, 0, 1) @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: @@ -45,9 +45,7 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] ] def _is_valid_task(self, task) -> bool: - vmlinux = self.context.modules[self.config["kernel"]] - vmlinux_layer = self.context.layers[vmlinux.layer_name] - return bool(task and task.pid > 0 and vmlinux_layer.is_valid(task.parent)) + return bool(task and task.pid > 0 and task.parent.is_readable()) def _get_pidtype_pid(self): vmlinux = self.context.modules[self.config["kernel"]] @@ -96,7 +94,7 @@ def _walk_upid(self, seen_upids, upid): seen_upids.add(upid.vol.offset) pid_chain = upid.pid_chain - if not (pid_chain and vmlinux_layer.is_valid(pid_chain.vol.offset)): + if not (pid_chain.next and pid_chain.next.is_readable()): break upid = linux.LinuxUtilities.container_of( @@ -105,7 +103,6 @@ def _walk_upid(self, seen_upids, upid): def _get_upids(self): vmlinux = self.context.modules[self.config["kernel"]] - vmlinux_layer = self.context.layers[vmlinux.layer_name] # 2.6.24 <= kernels < 4.15 pidhash = self._get_pidhash_array() @@ -115,7 +112,7 @@ def _get_upids(self): # each entry in the hlist is a upid which is wrapped in a pid ent = hlist.first - while ent and vmlinux_layer.is_valid(ent.vol.offset): + while ent and ent.is_readable(): # upid->pid_chain exists 2.6.24 <= kernel < 4.15 upid = linux.LinuxUtilities.container_of( ent.vol.offset, "upid", "pid_chain", vmlinux @@ -143,7 +140,7 @@ def _pid_hash_implementation(self): continue pid_tasks_0 = pid.tasks[pidtype_pid].first - if not pid_tasks_0: + if not (pid_tasks_0 and pid_tasks_0.is_readable()): continue task = vmlinux.object( @@ -160,7 +157,7 @@ def _task_for_radix_pid_node(self, nodep): pidtype_pid = self._get_pidtype_pid() pid_tasks_0 = pid.tasks[pidtype_pid].first - if not pid_tasks_0: + if not (pid_tasks_0 and pid_tasks_0.is_readable()): return None task_struct_type = vmlinux.get_type("task_struct") diff --git a/volatility3/framework/plugins/linux/sockstat.py b/volatility3/framework/plugins/linux/sockstat.py index c76e0a6b49..f75f73346b 100644 --- a/volatility3/framework/plugins/linux/sockstat.py +++ b/volatility3/framework/plugins/linux/sockstat.py @@ -22,7 +22,7 @@ class SockHandlers(interfaces.configuration.VersionableInterface): _required_framework_version = (2, 0, 0) - _version = (1, 0, 1) + _version = (3, 0, 0) def __init__(self, vmlinux, task, *args, **kwargs): super().__init__(*args, **kwargs) @@ -439,7 +439,7 @@ class Sockstat(plugins.PluginInterface): _required_framework_version = (2, 0, 0) - _version = (2, 0, 0) + _version = (3, 0, 0) @classmethod def get_requirements(cls): @@ -450,7 +450,7 @@ def get_requirements(cls): architectures=["Intel32", "Intel64"], ), requirements.VersionRequirement( - name="SockHandlers", component=SockHandlers, version=(1, 0, 0) + name="SockHandlers", component=SockHandlers, version=(3, 0, 0) ), requirements.PluginRequirement( name="lsof", plugin=lsof.Lsof, version=(2, 0, 0) @@ -608,6 +608,8 @@ def _generator(self, pids: List[int], netns_id_arg: int, symbol_table: str): if netns_id_arg and netns_id_arg != netns_id: continue + task_comm = utility.array_to_string(task.comm) + sock, sock_stat, extended = sock_fields sock_stat, protocol = self._format_fields(sock_stat, protocol) @@ -619,6 +621,7 @@ def _generator(self, pids: List[int], netns_id_arg: int, symbol_table: str): fields = ( netns_id, + task_comm, task.tgid, task.pid, fd_num, @@ -639,6 +642,7 @@ def run(self): tree_grid_args = [ ("NetNS", int), + ("Process Name", str), ("PID", int), ("TID", int), ("FD", int), diff --git a/volatility3/framework/plugins/timeliner.py b/volatility3/framework/plugins/timeliner.py index f657a29183..d1cb9f4601 100644 --- a/volatility3/framework/plugins/timeliner.py +++ b/volatility3/framework/plugins/timeliner.py @@ -45,6 +45,7 @@ class Timeliner(interfaces.plugins.PluginInterface): orders the results by time.""" _required_framework_version = (2, 0, 0) + _version = (1, 1, 0) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -198,9 +199,10 @@ def _generator( ), ) ) - except Exception: + except Exception as e: vollog.log( - logging.INFO, f"Exception occurred running plugin: {plugin_name}" + logging.INFO, + f"Exception occurred running plugin: {plugin_name}: {e}", ) vollog.log(logging.DEBUG, traceback.format_exc()) @@ -245,6 +247,18 @@ def run(self): filter_list = self.config["plugin-filter"] # Identify plugins that we can run which output datetimes for plugin_class in self.usable_plugins: + if not issubclass(plugin_class, TimeLinerInterface): + # get_usable_plugins() should filter this, but adding a safeguard just in case + continue + + if filter_list and not any( + [ + filter in plugin_class.__module__ + "." + plugin_class.__name__ + for filter in filter_list + ] + ): + continue + try: automagics = automagic.choose_automagic(self.automagics, plugin_class) @@ -276,15 +290,8 @@ def run(self): config_value, ) - if isinstance(plugin, TimeLinerInterface): - if not len(filter_list) or any( - [ - filter - in plugin.__module__ + "." + plugin.__class__.__name__ - for filter in filter_list - ] - ): - plugins_to_run.append(plugin) + plugins_to_run.append(plugin) + except exceptions.UnsatisfiedException as excp: # Remove the failed plugin from the list and continue vollog.debug( diff --git a/volatility3/framework/plugins/windows/callbacks.py b/volatility3/framework/plugins/windows/callbacks.py index d5eeda1ea3..562846def5 100644 --- a/volatility3/framework/plugins/windows/callbacks.py +++ b/volatility3/framework/plugins/windows/callbacks.py @@ -248,8 +248,12 @@ def scan( context, layer_name, nt_symbol_table, constraints ): try: - if hasattr(mem_object, "is_valid") and not mem_object.is_valid(): - continue + if isinstance(mem_object, callbacks._SHUTDOWN_PACKET): + if not mem_object.is_parseable(type_map): + continue + elif hasattr(mem_object, "is_valid"): + if not mem_object.is_valid(): + continue yield cls._process_scanned_callback(mem_object, type_map) except exceptions.InvalidAddressException: diff --git a/volatility3/framework/plugins/windows/debugregisters.py b/volatility3/framework/plugins/windows/debugregisters.py new file mode 100644 index 0000000000..57dd1822c9 --- /dev/null +++ b/volatility3/framework/plugins/windows/debugregisters.py @@ -0,0 +1,211 @@ +# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0 +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 + +# Full details on the techniques used in these plugins to detect EDR-evading malware +# can be found in our 20 page whitepaper submitted to DEFCON along with the presentation +# https://www.volexity.com/wp-content/uploads/2024/08/Defcon24_EDR_Evasion_Detection_White-Paper_Andrew-Case.pdf + +import logging + +from typing import Tuple, Optional, Generator, List, Dict + +from functools import partial + +from volatility3.framework import renderers, interfaces, exceptions +from volatility3.framework.configuration import requirements +from volatility3.framework.renderers import format_hints +import volatility3.plugins.windows.pslist as pslist +import volatility3.plugins.windows.threads as threads +import volatility3.plugins.windows.pe_symbols as pe_symbols + +vollog = logging.getLogger(__name__) + + +class DebugRegisters(interfaces.plugins.PluginInterface): + # version 2.6.0 adds support for scanning for 'Ethread' structures by pool tags + _required_framework_version = (2, 6, 0) + _version = (1, 0, 0) + + @classmethod + def get_requirements(cls) -> List: + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.VersionRequirement( + name="pslist", component=pslist.PsList, version=(2, 0, 0) + ), + requirements.VersionRequirement( + name="pe_symbols", component=pe_symbols.PESymbols, version=(1, 0, 0) + ), + ] + + @staticmethod + def _get_debug_info( + ethread: interfaces.objects.ObjectInterface, + ) -> Optional[Tuple[interfaces.objects.ObjectInterface, int, int, int, int, int]]: + """ + Gathers information related to the debug registers for the given thread + Args: + ethread: the thread (_ETHREAD) to examine + Returns: + Tuple[interfaces.objects.ObjectInterface, int, int, int, int, int]: The owner process of the thread and the values for dr7, dr0, dr1, dr2, dr3 + """ + try: + dr7 = ethread.Tcb.TrapFrame.Dr7 + state = ethread.Tcb.State + except exceptions.InvalidAddressException: + return None + + # 0 = debug registers not active + # 4 = terminated + if dr7 == 0 or state == 4: + return None + + try: + owner_proc = ethread.owning_process() + except (AttributeError, exceptions.InvalidAddressException): + return None + + dr0 = ethread.Tcb.TrapFrame.Dr0 + dr1 = ethread.Tcb.TrapFrame.Dr1 + dr2 = ethread.Tcb.TrapFrame.Dr2 + dr3 = ethread.Tcb.TrapFrame.Dr3 + + # bail if all are 0 + if not (dr0 or dr1 or dr2 or dr3): + return None + + return owner_proc, dr7, dr0, dr1, dr2, dr3 + + def _generator( + self, + ) -> Generator[ + Tuple[ + int, + Tuple[ + str, + int, + int, + int, + int, + format_hints.Hex, + str, + str, + format_hints.Hex, + str, + str, + format_hints.Hex, + str, + str, + format_hints.Hex, + str, + str, + ], + ], + None, + None, + ]: + kernel = self.context.modules[self.config["kernel"]] + + vads_cache: Dict[int, pe_symbols.ranges_type] = {} + + proc_modules = None + + procs = pslist.PsList.list_processes( + context=self.context, + layer_name=kernel.layer_name, + symbol_table=kernel.symbol_table_name, + ) + + for proc in procs: + for thread in threads.Threads.list_threads(kernel, proc): + debug_info = self._get_debug_info(thread) + if not debug_info: + continue + + owner_proc, dr7, dr0, dr1, dr2, dr3 = debug_info + + vads = pe_symbols.PESymbols.get_vads_for_process_cache( + vads_cache, owner_proc + ) + if not vads: + continue + + # this lookup takes a while, so only perform if we need to + if not proc_modules: + proc_modules = pe_symbols.PESymbols.get_process_modules( + self.context, kernel.layer_name, kernel.symbol_table_name, None + ) + path_and_symbol = partial( + pe_symbols.PESymbols.path_and_symbol_for_address, + self.context, + self.config_path, + proc_modules, + ) + + file0, sym0 = path_and_symbol(vads, dr0) + file1, sym1 = path_and_symbol(vads, dr1) + file2, sym2 = path_and_symbol(vads, dr2) + file3, sym3 = path_and_symbol(vads, dr3) + + # if none map to an actual file VAD then bail + if not (file0 or file1 or file2 or file3): + continue + + process_name = owner_proc.ImageFileName.cast( + "string", + max_length=owner_proc.ImageFileName.vol.count, + errors="replace", + ) + + thread_tid = thread.Cid.UniqueThread + + yield ( + 0, + ( + process_name, + owner_proc.UniqueProcessId, + thread_tid, + thread.Tcb.State, + dr7, + format_hints.Hex(dr0), + file0 or renderers.NotApplicableValue(), + sym0 or renderers.NotApplicableValue(), + format_hints.Hex(dr1), + file1 or renderers.NotApplicableValue(), + sym1 or renderers.NotApplicableValue(), + format_hints.Hex(dr2), + file2 or renderers.NotApplicableValue(), + sym2 or renderers.NotApplicableValue(), + format_hints.Hex(dr3), + file3 or renderers.NotApplicableValue(), + sym3 or renderers.NotApplicableValue(), + ), + ) + + def run(self) -> renderers.TreeGrid: + return renderers.TreeGrid( + [ + ("Process", str), + ("PID", int), + ("TID", int), + ("State", int), + ("Dr7", int), + ("Dr0", format_hints.Hex), + ("Range0", str), + ("Symbol0", str), + ("Dr1", format_hints.Hex), + ("Range1", str), + ("Symbol1", str), + ("Dr2", format_hints.Hex), + ("Range2", str), + ("Symbol2", str), + ("Dr3", format_hints.Hex), + ("Range3", str), + ("Symbol3", str), + ], + self._generator(), + ) diff --git a/volatility3/framework/plugins/windows/netscan.py b/volatility3/framework/plugins/windows/netscan.py index 868bd8bcd1..66a24da5ac 100644 --- a/volatility3/framework/plugins/windows/netscan.py +++ b/volatility3/framework/plugins/windows/netscan.py @@ -76,7 +76,7 @@ def create_netscan_constraints( # ~ vollog.debug("Using pool size constraints: TcpL {}, TcpE {}, UdpA {}".format(tcpl_size, tcpe_size, udpa_size)) - return [ + constraints = [ # TCP listener poolscanner.PoolConstraint( b"TcpL", @@ -100,6 +100,19 @@ def create_netscan_constraints( ), ] + if symbol_table.startswith("netscan-win10-20348"): + vollog.debug("Adding additional pool constraint for `TTcb` tags") + constraints.append( + poolscanner.PoolConstraint( + b"TTcb", + type_name=symbol_table + constants.BANG + "_TCP_ENDPOINT", + size=(tcpe_size, None), + page_type=poolscanner.PoolType.NONPAGED | poolscanner.PoolType.FREE, + ) + ) + + return constraints + @classmethod def determine_tcpip_version( cls, diff --git a/volatility3/framework/plugins/windows/pe_symbols.py b/volatility3/framework/plugins/windows/pe_symbols.py new file mode 100644 index 0000000000..955098d6bf --- /dev/null +++ b/volatility3/framework/plugins/windows/pe_symbols.py @@ -0,0 +1,1017 @@ +# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0 +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 + +import copy +import io +import logging +import ntpath + +from typing import Dict, Tuple, Optional, List, Generator, Union, Callable + +import pefile + +from volatility3.framework import interfaces, exceptions +from volatility3.framework import renderers, constants +from volatility3.framework.configuration import requirements +from volatility3.framework.renderers import format_hints +from volatility3.framework.symbols import intermed +from volatility3.framework.symbols.windows import pdbutil +from volatility3.framework.symbols.windows.extensions import pe +from volatility3.plugins.windows import pslist, modules +from volatility3.framework.constants.windows import KERNEL_MODULE_NAMES + +vollog = logging.getLogger(__name__) + +# keys for specifying wanted names and/or addresses +# used for consistent access between the API and plugins +wanted_names_identifier = "names" +wanted_addresses_identifier = "addresses" + +# how wanted modules/symbols are specified, such as: +# {"ntdll.dll" : {wanted_addresses : [42, 43, 43]}} +# {"ntdll.dll" : {wanted_names : ["NtCreateThread"]}} +filter_module_info = Union[Dict[str, List[str]], Dict[str, List[int]]] +filter_modules_type = Dict[str, filter_module_info] + +# holds resolved symbols +# {"ntdll.dll": [("Bob", 123), ("Alice", 456)]} +found_symbols_module = List[Tuple[str, int]] +found_symbols_type = Dict[str, found_symbols_module] + +# used to hold informatin about a range (VAD or kernel module) +# (start address, size, file path) +range_type = Tuple[int, int, str] +ranges_type = List[range_type] + +# collected_modules are modules and their symbols found when walking vads or kernel modules +# Tuple of (process or kernel layer name, range start, range size) +collected_module_instance = Tuple[str, int, int] +collected_modules_info = List[collected_module_instance] +collected_modules_type = Dict[str, collected_modules_info] + +PESymbolFinders = Union[interfaces.context.ModuleInterface, pefile.ExportDirData] + + +class PESymbolFinder: + """ + Interface for PE symbol finding classes + This interface provides a standard way for the calling code to + lookup symbols by name or address + """ + + cached_str_dict = Dict[str, Optional[str]] + + cached_int_dict = Dict[str, Optional[int]] + + cached_value = Union[int, str, None] + cached_module_lists = Union[Dict[str, List[str]], Dict[str, List[int]]] + cached_value_dict = Dict[str, cached_module_lists] + + def __init__( + self, + layer_name: str, + mod_name: str, + module_start: int, + symbol_module: PESymbolFinders, + ): + self._layer_name = layer_name + self._mod_name = mod_name + self._module_start = module_start + self._symbol_module = symbol_module + + self._address_cache: PESymbolFinder.cached_int_dict = {} + self._name_cache: PESymbolFinder.cached_str_dict = {} + + def _get_cache_key(self, value: cached_value) -> str: + """ + Maintain a cache for symbol lookups to avoid re-walking of PDB symbols or export tables + within the same module for the same address in the same layer + + Args: + value: The value (address or name) being cached + + Returns: + str: The constructed cache key that includes the layer and module name + """ + return f"{self._layer_name}|{self._mod_name}|{value}" + + def get_name_for_address(self, address: int) -> Optional[str]: + """ + Returns the name for the given address within the particular layer and module + + Args: + address: the address to resolve within the module + + Returns: + str: the name of the symbol, if found + """ + cached_key = self._get_cache_key(address) + if cached_key not in self._name_cache: + name = self._do_get_name(address) + self._name_cache[cached_key] = name + + return self._name_cache[cached_key] + + def get_address_for_name(self, name: str) -> Optional[int]: + """ + Returns the name for the given address within the particular layer and module + + Args: + str: the name of the symbol to resolve + + Returns: + address: the address of the symbol, if found + """ + cached_key = self._get_cache_key(name) + if cached_key not in self._address_cache: + address = self._do_get_address(name) + self._address_cache[cached_key] = address + + return self._address_cache[cached_key] + + def _do_get_name(self, address: int) -> Optional[str]: + """ + Returns the name for the given address within the particular layer and module. + This method must be overwritten by sub classes. + + Args: + address: the address to resolve within the module + + Returns: + str: the name of the symbol, if found + """ + raise NotImplementedError("_do_get_name must be overwritten") + + def _do_get_address(self, name: str) -> Optional[int]: + """ + Returns the name for the given address within the particular layer and module + This method must be overwritten by sub classes. + + Args: + str: the name of the symbol to resolve + + Returns: + address: the address of the symbol, if found + """ + raise NotImplementedError("_do_get_address must be overwritten") + + +class PDBSymbolFinder(PESymbolFinder): + """ + PESymbolFinder implementation for PDB modules + """ + + def _do_get_address(self, name: str) -> Optional[int]: + """ + _do_get_address implementation for PDBSymbolFinder + + Args: + str: the name of the symbol to resolve + + Returns: + address: the address of the symbol, if found + """ + try: + return self._symbol_module.get_absolute_symbol_address(name) + except exceptions.SymbolError: + return None + + def _do_get_name(self, address: int) -> Optional[str]: + """ + _do_get_name implementation for PDBSymbolFinder + + Args: + address: the address to resolve within the module + + Returns: + str: the name of the symbol, if found + """ + try: + name = self._symbol_module.get_symbols_by_absolute_location(address)[0] + return name.split(constants.BANG)[1] + except (exceptions.SymbolError, IndexError): + return None + + +class ExportSymbolFinder(PESymbolFinder): + """ + PESymbolFinder implementation for PDB modules + """ + + def _get_name(self, export: pefile.ExportData) -> Optional[str]: + # AttributeError throws on empty or ordinal-only exports + try: + return export.name.decode("ascii") + except AttributeError: + return None + + def _do_get_name(self, address: int) -> Optional[str]: + """ + _do_get_name implementation for ExportSymbolFinder + + Args: + address: the address to resolve within the module + + Returns: + str: the name of the symbol, if found + """ + for export in self._symbol_module: + if export.address + self._module_start == address: + return self._get_name(export) + + return None + + def _do_get_address(self, name: str) -> Optional[int]: + """ + _do_get_address implementation for ExportSymbolFinder + Args: + str: the name of the symbol to resolve + + Returns: + address: the address of the symbol, if found + """ + + for export in self._symbol_module: + sym_name = self._get_name(export) + if sym_name and sym_name == name: + return self._module_start + export.address + + return None + + +class PESymbols(interfaces.plugins.PluginInterface): + """Prints symbols in PE files in process and kernel memory""" + + _required_framework_version = (2, 7, 0) + + _version = (1, 0, 0) + + # used for special handling of the kernel PDB file. See later notes + os_module_name = "ntoskrnl.exe" + + @classmethod + def get_requirements(cls) -> List: + # Since we're calling the plugin, make sure we have the plugin's requirements + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.VersionRequirement( + name="pslist", component=pslist.PsList, version=(2, 0, 0) + ), + requirements.VersionRequirement( + name="modules", component=modules.Modules, version=(2, 0, 0) + ), + requirements.VersionRequirement( + name="pdbutil", component=pdbutil.PDBUtility, version=(1, 0, 0) + ), + requirements.ChoiceRequirement( + name="source", + description="Where to resolve symbols.", + choices=["kernel", "processes"], + optional=False, + ), + requirements.StringRequirement( + name="module", + description='Module in which to resolve symbols. Use "ntoskrnl.exe" to resolve in the base kernel executable.', + optional=False, + ), + requirements.ListRequirement( + name="symbols", + element_type=str, + description="Symbol name to resolve", + optional=True, + ), + requirements.ListRequirement( + name="addresses", + element_type=int, + description="Address of symbol to resolve", + optional=True, + ), + ] + + @staticmethod + def _get_pefile_obj( + context: interfaces.context.ContextInterface, + pe_table_name: str, + layer_name: str, + base_address: int, + ) -> Optional[pefile.PE]: + """ + Attempts to pefile object from the bytes of the PE file + + Args: + pe_table_name: name of the pe types table + layer_name: name of the process layer + base_address: base address of the module + + Returns: + the constructed pefile object + """ + pe_data = io.BytesIO() + + try: + dos_header = context.object( + pe_table_name + constants.BANG + "_IMAGE_DOS_HEADER", + offset=base_address, + layer_name=layer_name, + ) + + for offset, data in dos_header.reconstruct(): + pe_data.seek(offset) + pe_data.write(data) + + pe_ret = pefile.PE(data=pe_data.getvalue(), fast_load=True) + + except exceptions.InvalidAddressException: + pe_ret = None + + return pe_ret + + @staticmethod + def range_info_for_address( + ranges: ranges_type, address: int + ) -> Optional[range_type]: + """ + Helper for getting the range information for an address. + Finds the range holding the `address` parameter + + Args: + address: the address to find the range for + + Returns: + Tuple[int, int, str]: The starting address, size, and file path of the range + + """ + for start, size, filepath in ranges: + if start <= address < start + size: + return start, size, filepath + + return None + + @staticmethod + def filepath_for_address(ranges: ranges_type, address: int) -> Optional[str]: + """ + Helper to get the file path for an address + + Args: + ranges: The set of VADs with mapped files to find the address + address: The address to find inside of the VADs set + + Returns: + str: The full path of the file, if found and present + """ + info = PESymbols.range_info_for_address(ranges, address) + if info: + return info[2] + + return None + + @staticmethod + def filename_for_path(filepath: str) -> str: + """ + Consistent way to get the filename regardless of platform + + Args: + str: the file path from `filepath_for_address` + + Returns: + str: the bsae file name of the full path + """ + return ntpath.basename(filepath).lower() + + @staticmethod + def addresses_for_process_symbols( + context: interfaces.context.ContextInterface, + config_path: str, + layer_name: str, + symbol_table_name: str, + symbols: filter_modules_type, + ) -> found_symbols_type: + """ + Used to easily resolve the addresses of names inside of modules. + + See the usage of this function for system call resolution in unhooked_system_calls.py + for an easy to understand example. + + Args: + symbols: The dictionary of symbols requested by the caller + + Returns: + found_symbols_type: The dictionary of symbols that were resolved + """ + collected_modules = PESymbols.get_process_modules( + context, layer_name, symbol_table_name, symbols + ) + + found_symbols, missing_symbols = PESymbols.find_symbols( + context, config_path, symbols, collected_modules + ) + + for mod_name, unresolved_symbols in missing_symbols.items(): + for symbol in unresolved_symbols: + vollog.debug(f"Unable to resolve symbol {symbol} in module {mod_name}") + + return found_symbols + + @staticmethod + def path_and_symbol_for_address( + context: interfaces.context.ContextInterface, + config_path: str, + collected_modules: collected_modules_type, + ranges: ranges_type, + address: int, + ) -> Tuple[Optional[str], Optional[str]]: + """ + Method for plugins to determine the file path and symbol name for a given address + + See debugregisters.py for an example of how this function is used along with get_vads_for_process_cache + for resolving symbols in processes. + + Args: + collected_modules: return value from `get_kernel_modules` or `get_process_modules` + ranges: the memory ranges to examine in this layer. + address: address to resolve to its symbol name + Returns: + Tuple[str|renderers.NotApplicableValue|renderers.NotAvailableValue, str|renderers.NotApplicableValue|renderers.NotAvailableValue] + """ + if not address: + return None, None + + filepath = PESymbols.filepath_for_address(ranges, address) + + if not filepath: + return None, None + + filename = PESymbols.filename_for_path(filepath).lower() + + # setup to resolve the address + filter_module: filter_modules_type = { + filename: {wanted_addresses_identifier: [address]} + } + + found_symbols, _missing_symbols = PESymbols.find_symbols( + context, config_path, filter_module, collected_modules + ) + + if not found_symbols or filename not in found_symbols: + return filepath, None + + return filepath, found_symbols[filename][0][0] + + @staticmethod + def _get_exported_symbols( + context: interfaces.context.ContextInterface, + pe_table_name: str, + mod_name: str, + module_info: collected_module_instance, + ) -> Optional[ExportSymbolFinder]: + """ + Attempts to locate symbols based on export analysis + + Args: + mod_name: lower case name of the module to resolve symbols in + module_info: (layer_name, module_start, module_size) of the module to examine + + Returns: + Optional[ExportSymbolFinder]: If the export table can be resolved, then the ExportSymbolFinder + instance for it + """ + + layer_name = module_info[0] + module_start = module_info[1] + + # we need a valid PE with an export table + pe_module = PESymbols._get_pefile_obj( + context, pe_table_name, layer_name, module_start + ) + if not pe_module: + return None + + pe_module.parse_data_directories( + directories=[pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_EXPORT"]] + ) + if not hasattr(pe_module, "DIRECTORY_ENTRY_EXPORT"): + return None + + return ExportSymbolFinder( + layer_name, + mod_name.lower(), + module_start, + pe_module.DIRECTORY_ENTRY_EXPORT.symbols, + ) + + @staticmethod + def _get_pdb_module( + context: interfaces.context.ContextInterface, + config_path: str, + mod_name: str, + module_info: collected_module_instance, + ) -> Optional[PDBSymbolFinder]: + """ + Attempts to locate symbols based on PDB analysis through each layer where the mod_name module was found + + Args: + mod_name: lower case name of the module to resolve symbols in + module_info: (layer_name, module_start, module_size) of the module to examine + + Returns: + Optional[PDBSymbolFinder]: If the export table can be resolved, then the ExportSymbolFinder + """ + + mod_symbols = None + + layer_name, module_start, module_size = module_info + + # the PDB name of the kernel file is not consistent for an exe, for example, + # a `ntoskrnl.exe` can have an internal PDB name of any of the ones in the following list + # The code attempts to find all possible PDBs to ensure the best chance of recovery + if mod_name == PESymbols.os_module_name: + pdb_names = [fn + ".pdb" for fn in KERNEL_MODULE_NAMES] + + # for non-kernel files, replace the exe, sys, or dll extension with pdb + else: + # in testing we found where some DLLs, such amsi.dll, have its PDB string as Amsi.dll + # in certain Windows versions + mod_name = mod_name[:-3] + "pdb" + first_upper = mod_name[0].upper() + mod_name[1:] + pdb_names = [mod_name, first_upper] + + # loop through each PDB name (all the kernel names or the dll name as lower() + first char upper case) + for pdb_name in pdb_names: + try: + mod_symbols = pdbutil.PDBUtility.symbol_table_from_pdb( + context, + interfaces.configuration.path_join(config_path, mod_name), + layer_name, + pdb_name, + module_start, + module_size, + ) + + if mod_symbols: + break + + # this exception is expected when the PDB can't be found or downloaded + except exceptions.VolatilityException: + continue + + # this is not expected - it means pdbconv broke when parsing the PDB + except TypeError as e: + vollog.error( + f"Unable to parse PDB file for module {pdb_name} -> {e}. Please file a bug on the GitHub issue tracker." + ) + + # cannot do anything without the symbols + if not mod_symbols: + return None + + pdb_module = context.module( + mod_symbols, layer_name=layer_name, offset=module_start + ) + + return PDBSymbolFinder(layer_name, mod_name, module_start, pdb_module) + + @staticmethod + def _find_symbols_through_pdb( + context: interfaces.context.ContextInterface, + config_path: str, + module_instances: collected_modules_info, + mod_name: str, + ) -> Generator[PDBSymbolFinder, None, None]: + """ + Attempts to resolve the symbols in `mod_name` through PDB analysis + + Args: + module_instances: the set of layers in which the module was found + mod_name: name of the module to resolve symbols in + Returns: + Generator[PDBSymbolFinder]: a PDBSymbolFinder instance for each layer in which the module was found + """ + for module_info in module_instances: + mod_module = PESymbols._get_pdb_module( + context, config_path, mod_name, module_info + ) + if mod_module: + yield mod_module + + @staticmethod + def _find_symbols_through_exports( + context: interfaces.context.ContextInterface, + config_path: str, + module_instances: collected_modules_info, + mod_name: str, + ) -> Generator[ExportSymbolFinder, None, None]: + """ + Attempts to resolve the symbols in `mod_name` through export analysis + + Args: + module_instances: the set of layers in which the module was found + mod_name: name of the module to resolve symbols in + Returns: + Generator[ExportSymbolFinder]: an ExportSymbolFinder instance for each layer in which the module was found + """ + pe_table_name = intermed.IntermediateSymbolTable.create( + context, config_path, "windows", "pe", class_types=pe.class_types + ) + + # for each process layer and VAD, construct a PE and examine the export table + for module_info in module_instances: + exported_symbols = PESymbols._get_exported_symbols( + context, pe_table_name, mod_name, module_info + ) + if exported_symbols: + yield exported_symbols + + @staticmethod + def _get_symbol_value( + wanted_symbols: filter_module_info, + symbol_resolver: PESymbolFinder, + ) -> Generator[Tuple[str, int, str, int], None, None]: + """ + Enumerates the symbols specified as wanted by the calling plugin + + Args: + wanted_symbols: the set of symbols for a particular module + symbol_resolver: method in a layer to resolve the symbols + + Returns: + Tuple[str, int, str, int]: the index and value of the found symbol in the wanted list, and the name and address of resolved symbol + """ + if ( + wanted_names_identifier not in wanted_symbols + and wanted_addresses_identifier not in wanted_symbols + ): + vollog.warning( + f"Invalid `wanted_symbols` sent to `find_symbols`. addresses and names keys both misssing." + ) + return + + symbol_keys: List[Tuple[str, Callable]] = [ + (wanted_names_identifier, symbol_resolver.get_address_for_name), + (wanted_addresses_identifier, symbol_resolver.get_name_for_address), + ] + + for symbol_key, symbol_getter in symbol_keys: + # address or name + if symbol_key in wanted_symbols: + # walk each wanted address or name + for value_index, wanted_value in enumerate(wanted_symbols[symbol_key]): + symbol_value = symbol_getter(wanted_value) + + if symbol_value: + # yield out deleteion key, deletion index, symbol name, symbol address + if symbol_key == wanted_names_identifier: + yield symbol_key, value_index, wanted_value, symbol_value # type: ignore + else: + yield symbol_key, value_index, symbol_value, wanted_value # type: ignore + + @staticmethod + def _resolve_symbols_through_methods( + context: interfaces.context.ContextInterface, + config_path: str, + module_instances: collected_modules_info, + wanted_modules: PESymbolFinder.cached_value_dict, + mod_name: str, + ) -> Tuple[found_symbols_module, PESymbolFinder.cached_module_lists]: + """ + Attempts to resolve every wanted symbol in `mod_name` + Every layer is enumerated for maximum chance of recovery + + Args: + module_instances: the set of layers in which the module was found + wanted_modules: The symbols to resolve tied to their module names + mod_name: name of the module to resolve symbols in + Returns: + Tuple[found_symbols_module, PESymbolFinder.cached_module_lists]: The set of found symbols and the ones that could not be resolved + """ + symbol_resolving_methods = [ + PESymbols._find_symbols_through_pdb, + PESymbols._find_symbols_through_exports, + ] + + found: found_symbols_module = [] + + # the symbols wanted from this module by the caller + wanted = wanted_modules[mod_name] + + # make a copy to remove from inside this function for returning to the caller + remaining = copy.deepcopy(wanted) + + done_processing = False + + for method in symbol_resolving_methods: + # every layer where this module was found through the given method + for symbol_resolver in method( + context, config_path, module_instances, mod_name + ): + vollog.debug(f"Have resolver for method {method}") + for ( + symbol_key, + value_index, + symbol_name, + symbol_address, + ) in PESymbols._get_symbol_value(remaining, symbol_resolver): + found.append((symbol_name, symbol_address)) + del remaining[symbol_key][value_index] + + # everything was resolved, stop this resolver + # remove this key from the remaining symbols to resolve + if not remaining[symbol_key]: + del remaining[symbol_key] + done_processing = True + break + + if done_processing: + break + + # stop all resolving + if done_processing: + break + + return found, remaining + + @staticmethod + def find_symbols( + context: interfaces.context.ContextInterface, + config_path: str, + wanted_modules: PESymbolFinder.cached_value_dict, + collected_modules: collected_modules_type, + ) -> Tuple[found_symbols_type, PESymbolFinder.cached_value_dict]: + """ + Loops through each method of symbol analysis until each wanted symbol is found + Returns the resolved symbols as a dictionary that includes the name and runtime address + + Args: + wanted_modules: the dictionary of modules and symbols to resolve. Modified to remove symbols as they are resolved. + collected_modules: return value from `get_kernel_modules` or `get_process_modules` + Returns: + Tuple[found_symbols_type, PESymbolFinder.cached_value_dict]: The set of found symbols but the ones that could not be resolved + """ + found_symbols: found_symbols_type = {} + missing_symbols: PESymbolFinder.cached_value_dict = {} + + for mod_name in wanted_modules: + if mod_name not in collected_modules: + continue + + module_instances = collected_modules[mod_name] + + # try to resolve the symbols for `mod_name` through each method (PDB and export table currently) + ( + found_in_module, + missing_in_module, + ) = PESymbols._resolve_symbols_through_methods( + context, config_path, module_instances, wanted_modules, mod_name + ) + + if found_in_module: + found_symbols[mod_name] = found_in_module + + if missing_in_module: + missing_symbols[mod_name] = missing_in_module + + return found_symbols, missing_symbols + + @staticmethod + def get_kernel_modules( + context: interfaces.context.ContextInterface, + layer_name: str, + symbol_table: str, + filter_modules: Optional[filter_modules_type], + ) -> collected_modules_type: + """ + Walks the kernel module list and finds the session layer, base, and size of each wanted module + + Args: + filter_modules: The modules to filter the gathering to. If left as None, all kernel modules are gathered. + Returns: + collected_modules_type: The collection of modules found with at least one layer present + """ + found_modules: collected_modules_type = {} + + if filter_modules: + # create a tuple of module names for use with `endswith` + filter_modules_check = tuple([key.lower() for key in filter_modules.keys()]) + else: + filter_modules_check = None + + session_layers = list( + modules.Modules.get_session_layers(context, layer_name, symbol_table) + ) + + # special handling for the kernel + gather_kernel = ( + filter_modules_check and PESymbols.os_module_name in filter_modules_check + ) + + for index, mod in enumerate( + modules.Modules.list_modules(context, layer_name, symbol_table) + ): + try: + mod_name = str(mod.BaseDllName.get_string().lower()) + except exceptions.InvalidAddressException: + continue + + # to analyze, it must either be the kernel or a wanted module + if not filter_modules_check or (gather_kernel and index == 0): + mod_name = PESymbols.os_module_name + elif filter_modules_check and not mod_name.endswith(filter_modules_check): + continue + + # we won't find symbol information if we can't analyze the module + session_layer_name = modules.Modules.find_session_layer( + context, session_layers, mod.DllBase + ) + if not session_layer_name: + continue + + if mod_name not in found_modules: + found_modules[mod_name] = [] + + found_modules[mod_name].append( + (session_layer_name, mod.DllBase, mod.SizeOfImage) + ) + + return found_modules + + @staticmethod + def get_vads_for_process_cache( + vads_cache: Dict[int, ranges_type], + owner_proc: interfaces.objects.ObjectInterface, + ) -> Optional[ranges_type]: + """ + Creates and utilizes a cache of a process' VADs for efficient lookups + + Returns the vad information of the VAD hosting the address, if found + + Args: + vads_cache: The existing cache of VADs + owner_proc: The process being inspected + Returns: + Optional[ranges_type]: The range holding the address, if found + """ + if owner_proc.vol.offset in vads_cache: + vads = vads_cache[owner_proc.vol.offset] + else: + vads = PESymbols.get_proc_vads_with_file_paths(owner_proc) + vads_cache[owner_proc.vol.offset] = vads + + # smear or terminated process + if len(vads) == 0: + return None + + return vads + + @staticmethod + def get_proc_vads_with_file_paths( + proc: interfaces.objects.ObjectInterface, + ) -> ranges_type: + """ + Returns a list of the process' vads that map a file + + Args: + proc: The process to gather the VADs for + + Returns: + ranges_type: The list of VADs for this process that map a file + """ + vads: ranges_type = [] + + try: + vad_root = proc.get_vad_root() + except exceptions.InvalidAddressException: + return vads + + for vad in vad_root.traverse(): + filepath = vad.get_file_name() + + if not isinstance(filepath, str) or filepath.count("\\") == 0: + continue + + vads.append((vad.get_start(), vad.get_size(), filepath)) + + return vads + + @classmethod + def get_all_vads_with_file_paths( + cls, + context: interfaces.context.ContextInterface, + layer_name: str, + symbol_table_name: str, + ) -> Generator[ + Tuple[interfaces.objects.ObjectInterface, str, ranges_type], + None, + None, + ]: + """ + Yields each set of vads for a process that have a file mapped, along with the process itself and its layer + + Args: + Generator[Tuple[interfaces.objects.ObjectInterface, str, ranges_type]]: Yields tuple of process objects, layers, and VADs mapping files + """ + procs = pslist.PsList.list_processes( + context=context, + layer_name=layer_name, + symbol_table=symbol_table_name, + ) + + for proc in procs: + try: + proc_layer_name = proc.add_process_layer() + except exceptions.InvalidAddressException: + continue + + vads = cls.get_proc_vads_with_file_paths(proc) + + yield proc, proc_layer_name, vads + + @staticmethod + def get_process_modules( + context: interfaces.context.ContextInterface, + layer_name: str, + symbol_table: str, + filter_modules: Optional[filter_modules_type], + ) -> collected_modules_type: + """ + Walks the process list and each process' VAD to determine the base address and size of wanted modules + + Args: + filter_modules: The modules to filter the gathering to. If left as None, all process modules are gathered. + Returns: + collected_modules_type: The collection of modules found with at least one layer present + """ + proc_modules: collected_modules_type = {} + + if filter_modules: + # create a tuple of module names for use with `endswith` + filter_modules_check = tuple([key.lower() for key in filter_modules.keys()]) + else: + filter_modules_check = None + + for _proc, proc_layer_name, vads in PESymbols.get_all_vads_with_file_paths( + context, layer_name, symbol_table + ): + for vad_start, vad_size, filepath in vads: + filename = PESymbols.filename_for_path(filepath) + + if filter_modules_check and not filename.endswith(filter_modules_check): + continue + + # track each module along with the process layer and range to find it + if filename not in proc_modules: + proc_modules[filename] = [] + + proc_modules[filename].append((proc_layer_name, vad_start, vad_size)) + + return proc_modules + + def _generator(self) -> Generator[Tuple[int, Tuple[str, str, int]], None, None]: + kernel = self.context.modules[self.config["kernel"]] + + if self.config["symbols"]: + filter_module = { + self.config["module"].lower(): { + wanted_names_identifier: self.config["symbols"] + } + } + + elif self.config["addresses"]: + filter_module = { + self.config["module"].lower(): { + wanted_addresses_identifier: self.config["addresses"] + } + } + + else: + vollog.error("--address or --symbol must be specified") + return + + if self.config["source"] == "kernel": + module_resolver = self.get_kernel_modules + else: + module_resolver = self.get_process_modules + + collected_modules = module_resolver( + self.context, kernel.layer_name, kernel.symbol_table_name, filter_module + ) + + found_symbols, _missing_symbols = PESymbols.find_symbols( + self.context, self.config_path, filter_module, collected_modules + ) + + for module, symbols in found_symbols.items(): + for symbol, address in symbols: + yield (0, (module, symbol, format_hints.Hex(address))) + + def run(self) -> renderers.TreeGrid: + return renderers.TreeGrid( + [ + ("Module", str), + ("Symbol", str), + ("Address", format_hints.Hex), + ], + self._generator(), + ) diff --git a/volatility3/framework/plugins/windows/pslist.py b/volatility3/framework/plugins/windows/pslist.py index 8234b210f2..478cc8b1b6 100644 --- a/volatility3/framework/plugins/windows/pslist.py +++ b/volatility3/framework/plugins/windows/pslist.py @@ -4,7 +4,7 @@ import datetime import logging -from typing import Callable, Iterable, List, Type +from typing import Callable, Iterator, List, Type from volatility3.framework import renderers, interfaces, layers, exceptions, constants from volatility3.framework.configuration import requirements @@ -12,6 +12,7 @@ from volatility3.framework.renderers import format_hints from volatility3.framework.symbols import intermed from volatility3.framework.symbols.windows.extensions import pe +from volatility3.framework.symbols.windows import extensions from volatility3.plugins import timeliner vollog = logging.getLogger(__name__) @@ -197,7 +198,7 @@ def list_processes( filter_func: Callable[ [interfaces.objects.ObjectInterface], bool ] = lambda _: False, - ) -> Iterable[interfaces.objects.ObjectInterface]: + ) -> Iterator["extensions.EPROCESS"]: """Lists all the processes in the primary layer that are in the pid config option. diff --git a/volatility3/framework/plugins/windows/psxview.py b/volatility3/framework/plugins/windows/psxview.py index 71919c4101..a8d185a2c0 100644 --- a/volatility3/framework/plugins/windows/psxview.py +++ b/volatility3/framework/plugins/windows/psxview.py @@ -1,9 +1,14 @@ -import datetime, logging, string +import datetime +import logging +import string +from itertools import chain +from typing import Dict, Iterable, List from volatility3.framework import constants, exceptions -from volatility3.framework.interfaces import plugins from volatility3.framework.configuration import requirements -from volatility3.framework.renderers import format_hints, TreeGrid +from volatility3.framework.interfaces import plugins +from volatility3.framework.renderers import TreeGrid, format_hints +from volatility3.framework.symbols.windows import extensions from volatility3.plugins.windows import ( handles, info, @@ -71,20 +76,19 @@ def _proc_name_to_string(self, proc): "string", max_length=proc.ImageFileName.vol.count, errors="replace" ) - def _is_valid_proc_name(self, str): - for c in str: - if not c in self.valid_proc_name_chars: - return False - return True + def _is_valid_proc_name(self, string: str) -> bool: + return all(c in self.valid_proc_name_chars for c in string) - def _filter_garbage_procs(self, proc_list): + def _filter_garbage_procs( + self, proc_list: Iterable[extensions.EPROCESS] + ) -> List[extensions.EPROCESS]: return [ p for p in proc_list if p.is_valid() and self._is_valid_proc_name(self._proc_name_to_string(p)) ] - def _translate_offset(self, offset): + def _translate_offset(self, offset: int) -> int: if not self.config["physical-offsets"]: return offset @@ -100,21 +104,25 @@ def _translate_offset(self, offset): return offset - def _proc_list_to_dict(self, tasks): + def _proc_list_to_dict( + self, tasks: Iterable[extensions.EPROCESS] + ) -> Dict[int, extensions.EPROCESS]: tasks = self._filter_garbage_procs(tasks) return {self._translate_offset(proc.vol.offset): proc for proc in tasks} def _check_pslist(self, tasks): return self._proc_list_to_dict(tasks) - def _check_psscan(self, layer_name, symbol_table): + def _check_psscan( + self, layer_name: str, symbol_table: str + ) -> Dict[int, extensions.EPROCESS]: res = psscan.PsScan.scan_processes( context=self.context, layer_name=layer_name, symbol_table=symbol_table ) return self._proc_list_to_dict(res) - def _check_thrdscan(self): + def _check_thrdscan(self) -> Dict[int, extensions.EPROCESS]: ret = [] for ethread in thrdscan.ThrdScan.scan_threads( @@ -135,33 +143,38 @@ def _check_thrdscan(self): return self._proc_list_to_dict(ret) - def _check_csrss_handles(self, tasks, layer_name, symbol_table): - ret = [] + def _check_csrss_handles( + self, tasks: Iterable[extensions.EPROCESS], layer_name: str, symbol_table: str + ) -> Dict[int, extensions.EPROCESS]: + ret: List[extensions.EPROCESS] = [] + + handles_plugin = handles.Handles( + context=self.context, config_path=self.config_path + ) + + type_map = handles_plugin.get_type_map(self.context, layer_name, symbol_table) + + cookie = handles_plugin.find_cookie( + context=self.context, + layer_name=layer_name, + symbol_table=symbol_table, + ) for p in tasks: name = self._proc_name_to_string(p) - if name == "csrss.exe": - try: - if p.has_member("ObjectTable"): - handles_plugin = handles.Handles( - context=self.context, config_path=self.config_path - ) - hndls = list(handles_plugin.handles(p.ObjectTable)) - for h in hndls: - if ( - h.get_object_type( - handles_plugin.get_type_map( - self.context, layer_name, symbol_table - ) - ) - == "Process" - ): - ret.append(h.Body.cast("_EPROCESS")) - - except exceptions.InvalidAddressException: - vollog.log( - constants.LOGLEVEL_VVV, "Cannot access eprocess object table" - ) + if name != "csrss.exe": + continue + + try: + ret += [ + handle.Body.cast("_EPROCESS") + for handle in handles_plugin.handles(p.ObjectTable) + if handle.get_object_type(type_map, cookie) == "Process" + ] + except exceptions.InvalidAddressException: + vollog.log( + constants.LOGLEVEL_VVV, "Cannot access eprocess object table" + ) return self._proc_list_to_dict(ret) @@ -178,7 +191,7 @@ def _generator(self): ) # get processes from each source - processes = {} + processes: Dict[str, Dict[int, extensions.EPROCESS]] = {} processes["pslist"] = self._check_pslist(kdbg_list_processes) processes["psscan"] = self._check_psscan(layer_name, symbol_table) @@ -187,27 +200,20 @@ def _generator(self): kdbg_list_processes, layer_name, symbol_table ) - # print results - - # list of lists of offsets - offsets = [list(processes[source].keys()) for source in processes] - - # flatten to one list - offsets = sum(offsets, []) - - # remove duplicates - offsets = set(offsets) + # Unique set of all offsets from all sources + offsets = set(chain(*(mapping.keys() for mapping in processes.values()))) for offset in offsets: - proc = None + # We know there will be at least one process mapped to each offset + proc: extensions.EPROCESS = next( + mapping[offset] for mapping in processes.values() if offset in mapping + ) in_sources = {src: False for src in processes} - for source in processes: - if offset in processes[source]: + for source, process_mapping in processes.items(): + if offset in process_mapping: in_sources[source] = True - if not proc: - proc = processes[source][offset] pid = proc.UniqueProcessId name = self._proc_name_to_string(proc) diff --git a/volatility3/framework/plugins/windows/unhooked_system_calls.py b/volatility3/framework/plugins/windows/unhooked_system_calls.py new file mode 100644 index 0000000000..1a1e599407 --- /dev/null +++ b/volatility3/framework/plugins/windows/unhooked_system_calls.py @@ -0,0 +1,208 @@ +# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0 +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 + +# Full details on the techniques used in these plugins to detect EDR-evading malware +# can be found in our 20 page whitepaper submitted to DEFCON along with the presentation +# https://www.volexity.com/wp-content/uploads/2024/08/Defcon24_EDR_Evasion_Detection_White-Paper_Andrew-Case.pdf + +import logging + +from typing import Dict, Tuple, List, Generator + +from volatility3.framework import interfaces, exceptions +from volatility3.framework import renderers +from volatility3.framework.configuration import requirements +from volatility3.framework.objects import utility +from volatility3.plugins.windows import pslist, pe_symbols + +vollog = logging.getLogger(__name__) + + +class unhooked_system_calls(interfaces.plugins.PluginInterface): + """Looks for signs of Skeleton Key malware""" + + _required_framework_version = (2, 4, 0) + + system_calls = { + "ntdll.dll": { + pe_symbols.wanted_names_identifier: [ + "NtCreateThread", + "NtProtectVirtualMemory", + "NtReadVirtualMemory", + "NtOpenProcess", + "NtWriteFile", + "NtQueryVirtualMemory", + "NtAllocateVirtualMemory", + "NtWorkerFactoryWorkerReady", + "NtAcceptConnectPort", + "NtAddDriverEntry", + "NtAdjustPrivilegesToken", + "NtAlpcCreatePort", + "NtClose", + "NtCreateFile", + "NtCreateMutant", + "NtOpenFile", + "NtOpenIoCompletion", + "NtOpenJobObject", + "NtOpenKey", + "NtOpenKeyEx", + "NtOpenThread", + "NtOpenThreadToken", + "NtOpenThreadTokenEx", + "NtWriteVirtualMemory", + "NtTraceEvent", + "NtTranslateFilePath", + "NtUmsThreadYield", + "NtUnloadDriver", + "NtUnloadKey", + "NtUnloadKey2", + "NtUnloadKeyEx", + "NtCreateKey", + "NtCreateSection", + "NtDeleteKey", + "NtDeleteValueKey", + "NtDuplicateObject", + "NtQueryValueKey", + "NtReplaceKey", + "NtRequestWaitReplyPort", + "NtRestoreKey", + "NtSetContextThread", + "NtSetSecurityObject", + "NtSetValueKey", + "NtSystemDebugControl", + "NtTerminateProcess", + ] + } + } + + # This data structure is used to track unique implementations of functions across processes + # The outer dictionary holds the module name (e.g., ntdll.dll) + # The next dictionary holds the function names (NtTerminateProcess, NtSetValueKey, etc.) inside a module + # The innermost dictionary holds the unique implementation (bytes) of a function across processes + # Each implementation is tracked along with the process(es) that host it + # For systems without malware, all functions should have the same implementation + # When API hooking/module unhooking is done, the victim (infected) processes will have unique implementations + _code_bytes_type = Dict[str, Dict[str, Dict[bytes, List[Tuple[int, str]]]]] + + @classmethod + def get_requirements(cls) -> List: + # Since we're calling the plugin, make sure we have the plugin's requirements + return [ + requirements.ModuleRequirement( + name="kernel", + description="Windows kernel", + architectures=["Intel32", "Intel64"], + ), + requirements.VersionRequirement( + name="pslist", component=pslist.PsList, version=(2, 0, 0) + ), + requirements.PluginRequirement( + name="pe_symbols", plugin=pe_symbols.PESymbols, version=(1, 0, 0) + ), + ] + + def _gather_code_bytes( + self, + kernel: interfaces.context.ModuleInterface, + found_symbols: pe_symbols.found_symbols_type, + ) -> _code_bytes_type: + """ + Enumerates the desired DLLs and function implementations in each process + Groups based on unique implementations of each DLLs' functions + The purpose is to detect when a function has different implementations (code) + in different processes. + This very effectively detects code injection. + """ + code_bytes: unhooked_system_calls._code_bytes_type = {} + + procs = pslist.PsList.list_processes( + context=self.context, + layer_name=kernel.layer_name, + symbol_table=kernel.symbol_table_name, + ) + + for proc in procs: + try: + proc_id = proc.UniqueProcessId + proc_name = utility.array_to_string(proc.ImageFileName) + proc_layer_name = proc.add_process_layer() + except exceptions.InvalidAddressException: + continue + + for dll_name, functions in found_symbols.items(): + for func_name, func_addr in functions: + try: + fbytes = self.context.layers[proc_layer_name].read( + func_addr, 0x20 + ) + except exceptions.InvalidAddressException: + continue + + # see the definition of _code_bytes_type for details of this data structure + if dll_name not in code_bytes: + code_bytes[dll_name] = {} + + if func_name not in code_bytes[dll_name]: + code_bytes[dll_name][func_name] = {} + + if fbytes not in code_bytes[dll_name][func_name]: + code_bytes[dll_name][func_name][fbytes] = [] + + code_bytes[dll_name][func_name][fbytes].append((proc_id, proc_name)) + + return code_bytes + + def _generator(self) -> Generator[Tuple[int, Tuple[str, str, int]], None, None]: + kernel = self.context.modules[self.config["kernel"]] + + found_symbols = pe_symbols.PESymbols.addresses_for_process_symbols( + self.context, + self.config_path, + kernel.layer_name, + kernel.symbol_table_name, + unhooked_system_calls.system_calls, + ) + + # code_bytes[dll_name][func_name][func_bytes] + code_bytes = self._gather_code_bytes(kernel, found_symbols) + + # walk the functions that were evaluated + for functions in code_bytes.values(): + # cbb is the distinct groups of bytes (instructions) + # for this function across processes + for func_name, cbb in functions.items(): + # the dict key here is the raw instructions, which is not helpful to look at + # the values are the list of tuples for the (proc_id, proc_name) pairs for this set of bytes (instructions) + cb = list(cbb.values()) + + # if all processes map to the same implementation, then no malware is present + if len(cb) == 1: + yield 0, (func_name, "", len(cb[0])) + else: + # if there are differing implementations then it means + # that malware has overwritten system call(s) in infected processes + # max_idx and small_idx find which implementation of a system call has the least processes + # as all observed malware and open source projects only infected a few targets, leaving the + # rest with the original EDR hooks in place + max_idx = 0 if len(cb[0]) > len(cb[1]) else 1 + small_idx = (~max_idx) & 1 + + ps = [] + + # gather processes on small_idx since these are the malware infected ones + for pid, pname in cb[small_idx]: + ps.append("{:d}:{}".format(pid, pname)) + + proc_names = ", ".join(ps) + + yield 0, (func_name, proc_names, len(cb[max_idx])) + + def run(self) -> renderers.TreeGrid: + return renderers.TreeGrid( + [ + ("Function", str), + ("Distinct Implementations", str), + ("Total Implementations", int), + ], + self._generator(), + ) diff --git a/volatility3/framework/plugins/windows/vadinfo.py b/volatility3/framework/plugins/windows/vadinfo.py index abc6142fe0..2c6ed4dafc 100644 --- a/volatility3/framework/plugins/windows/vadinfo.py +++ b/volatility3/framework/plugins/windows/vadinfo.py @@ -3,7 +3,7 @@ # import logging -from typing import Callable, List, Generator, Iterable, Type, Optional +from typing import Callable, List, Generator, Iterable, Type, Optional, Tuple from volatility3.framework import renderers, interfaces, exceptions from volatility3.framework.configuration import requirements @@ -196,11 +196,31 @@ def vad_dump( return file_handle - def _generator(self, procs): + def _generator(self, procs: List[interfaces.objects.ObjectInterface]) -> Generator[ + Tuple[ + int, + Tuple[ + int, + str, + format_hints.Hex, + format_hints.Hex, + format_hints.Hex, + str, + str, + int, + int, + format_hints.Hex, + str, + str, + ], + ], + None, + None, + ]: kernel = self.context.modules[self.config["kernel"]] kernel_layer = self.context.layers[kernel.layer_name] - def passthrough(_: interfaces.objects.ObjectInterface) -> bool: + def passthrough(x: interfaces.objects.ObjectInterface) -> bool: return False filter_func = passthrough @@ -250,7 +270,7 @@ def filter_function(x: interfaces.objects.ObjectInterface) -> bool: ), ) - def run(self): + def run(self) -> renderers.TreeGrid: kernel = self.context.modules[self.config["kernel"]] filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None)) diff --git a/volatility3/framework/plugins/windows/vadyarascan.py b/volatility3/framework/plugins/windows/vadyarascan.py index 7bc3377c38..efcc70d07f 100644 --- a/volatility3/framework/plugins/windows/vadyarascan.py +++ b/volatility3/framework/plugins/windows/vadyarascan.py @@ -18,7 +18,7 @@ class VadYaraScan(interfaces.plugins.PluginInterface): """Scans all the Virtual Address Descriptor memory maps using yara.""" _required_framework_version = (2, 4, 0) - _version = (1, 1, 0) + _version = (1, 1, 1) @classmethod def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: @@ -68,7 +68,7 @@ def _generator(self): layer = self.context.layers[layer_name] for start, size in self.get_vad_maps(task): if size > sanity_check: - vollog.warn( + vollog.debug( f"VAD at 0x{start:x} over sanity-check size, not scanning" ) continue diff --git a/volatility3/framework/plugins/windows/verinfo.py b/volatility3/framework/plugins/windows/verinfo.py index 5b3c52bf6a..57b8dcd3fb 100644 --- a/volatility3/framework/plugins/windows/verinfo.py +++ b/volatility3/framework/plugins/windows/verinfo.py @@ -48,9 +48,6 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface] requirements.PluginRequirement( name="modules", plugin=modules.Modules, version=(2, 0, 0) ), - requirements.VersionRequirement( - name="dlllist", component=dlllist.DllList, version=(2, 0, 0) - ), requirements.BooleanRequirement( name="extensive", description="Search physical layer for version information", diff --git a/volatility3/framework/symbols/linux/__init__.py b/volatility3/framework/symbols/linux/__init__.py index 6e92975178..7815bcb1c8 100644 --- a/volatility3/framework/symbols/linux/__init__.py +++ b/volatility3/framework/symbols/linux/__init__.py @@ -169,13 +169,30 @@ def _get_new_sock_pipe_path(cls, context, task, filp) -> str: Returns: str: Sock pipe pathname relative to the task's root directory. """ + # FIXME: This function must be moved to the 'dentry' object extension + # Also, the scope of this function went beyond the sock pipe path, so we need to rename this. + # Once https://github.com/volatilityfoundation/volatility3/pull/1263 is merged, replace the + # dentry inode getters + + if not (filp and filp.is_readable()): + return f" {filp:x}" + dentry = filp.get_dentry() + if not (dentry and dentry.is_readable()): + return f" {dentry:x}" kernel_module = cls.get_module_from_volobj_type(context, dentry) sym_addr = dentry.d_op.d_dname + if not (sym_addr and sym_addr.is_readable()): + return f" {sym_addr:x}" + symbs = list(kernel_module.get_symbols_by_absolute_location(sym_addr)) + inode = dentry.d_inode + if not (inode and inode.is_readable() and inode.is_valid()): + return f" {inode:x}" + if len(symbs) == 1: sym = symbs[0].split(constants.BANG)[1] @@ -191,15 +208,41 @@ def _get_new_sock_pipe_path(cls, context, task, filp) -> str: elif sym == "simple_dname": pre_name = cls._get_path_file(task, filp) + elif sym == "ns_dname": + # From Kernels 3.19 + + # In Kernels >= 6.9, see Linux kernel commit 1fa08aece42512be072351f482096d5796edf7ca + # ns_common->stashed change from 'atomic64_t' to 'dentry*' + try: + ns_common_type = kernel_module.get_type("ns_common") + stashed_template = ns_common_type.child_template("stashed") + stashed_type_full_name = stashed_template.vol.type_name + stashed_type_name = stashed_type_full_name.split(constants.BANG)[1] + if stashed_type_name == "atomic64_t": + # 3.19 <= Kernels < 6.9 + fsdata_ptr = dentry.d_fsdata + if not (fsdata_ptr and fsdata_ptr.is_readable()): + raise IndexError + + ns_ops = fsdata_ptr.dereference().cast("proc_ns_operations") + else: + # Kernels >= 6.9 + private_ptr = inode.i_private + if not (private_ptr and private_ptr.is_readable()): + raise IndexError + + ns_common = private_ptr.dereference().cast("ns_common") + ns_ops = ns_common.ops + + pre_name = utility.pointer_to_string(ns_ops.name, 255) + except IndexError: + pre_name = "" else: - pre_name = f"" - - ret = f"{pre_name}:[{dentry.d_inode.i_ino:d}]" - + pre_name = f" {sym}" else: - ret = f" {sym_addr:x}" + pre_name = f" {sym_addr:x}" - return ret + return f"{pre_name}:[{inode.i_ino:d}]" @classmethod def path_for_file(cls, context, task, filp) -> str: diff --git a/volatility3/framework/symbols/windows/extensions/callbacks.py b/volatility3/framework/symbols/windows/extensions/callbacks.py index f894644dbb..f54db39f20 100644 --- a/volatility3/framework/symbols/windows/extensions/callbacks.py +++ b/volatility3/framework/symbols/windows/extensions/callbacks.py @@ -1,4 +1,5 @@ import logging +from typing import Dict from volatility3.framework import exceptions, objects from volatility3.framework.symbols.windows.extensions import pool @@ -24,8 +25,30 @@ def is_valid(self) -> bool: and self.Entry.Blink.is_readable() and self.DeviceObject.is_readable() ): + vollog.debug( + f"Callback obj 0x{self.vol.offset:x} invalid due to unreadable structure members" + ) return False + except exceptions.InvalidAddressException: + vollog.debug( + f"callback obj 0x{self.vol.offset:x} invalid due to invalid address access" + ) + return False + + return True + + def is_parseable(self, type_map: Dict[int, str]) -> bool: + """ + Determines whether or not this `_SHUTDOWN_PACKET` callback can be reliably parsed. + Requires a `type_map` that maps NT executive object type indices to string representations. + This type map can be acquired via the `handles.Handles.get_type_map` classmethod. + """ + if not self.is_valid(): + return False + + try: + device = self.DeviceObject if not device or not (device.DriverObject.DriverStart % 0x1000 == 0): vollog.debug( @@ -33,18 +56,23 @@ def is_valid(self) -> bool: ) return False + header = device.get_object_header() + object_type = header.get_object_type(type_map) + is_valid = object_type == "Device" + if not is_valid: + vollog.debug( + f"Callback obj 0x{self.vol.offset:x} invalid due to invalid device type: wanted 'Device', found '{object_type}'" + ) + return is_valid except exceptions.InvalidAddressException: vollog.debug( f"callback obj 0x{self.vol.offset:x} invalid due to invalid address access" ) return False - - try: - header = device.get_object_header() - valid = header.NameInfo.Name == "Device" - return valid except ValueError: - vollog.debug(f"Could not get NameInfo for object at 0x{self.vol.offset:x}") + vollog.debug( + f"Could not get object type for object at 0x{self.vol.offset:x}" + ) return False diff --git a/volatility3/symbols/generic/vmcs/nehalem-architecture.json b/volatility3/symbols/generic/vmcs/nehalem-architecture.json new file mode 100644 index 0000000000..ae0ab28633 --- /dev/null +++ b/volatility3/symbols/generic/vmcs/nehalem-architecture.json @@ -0,0 +1,131 @@ +{ + "base_types": { + "pointer": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 8 + }, + "unsigned char": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 1 + }, + "unsigned long": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 4 + }, + "unsigned long long": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 8 + }, + "unsigned short": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 2 + } + }, + "enums": {}, + "metadata": { + "format": "6.1.0", + "producer": { + "datetime": "2021-07-31T17:37:28.302702", + "name": "vmextract-by-hand", + "version": "0.0.1" + } + }, + "symbols": { + "revision_id": { + "address": 0, + "constant_data": "MTQ=" + } + }, + "user_types": { + "_VMCS": { + "fields": { + "ept": { + "offset": 232, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "executive_vmcs_ptr": { + "offset": 208, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "guest_cr3": { + "offset": 736, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "guest_cr4": { + "offset": 744, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "guest_pdpte": { + "offset": 928, + "type": { + "count": 4, + "kind": "array", + "subtype": { + "kind": "struct", + "name": "unsigned long long" + } + } + }, + "guest_physical_addr": { + "offset": 240, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "host_cr3": { + "offset": 832, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "host_cr4": { + "offset": 840, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "vmcs_link_ptr": { + "offset": 248, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "vpid": { + "offset": 752, + "type": { + "kind": "struct", + "name": "unsigned short" + } + } + }, + "kind": "struct", + "size": 4096 + } + } +} \ No newline at end of file diff --git a/volatility3/symbols/generic/vmcs/sandybridge-architecture.json b/volatility3/symbols/generic/vmcs/sandybridge-architecture.json new file mode 100644 index 0000000000..b2b3cfebfe --- /dev/null +++ b/volatility3/symbols/generic/vmcs/sandybridge-architecture.json @@ -0,0 +1,131 @@ +{ + "base_types": { + "pointer": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 8 + }, + "unsigned char": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 1 + }, + "unsigned long": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 4 + }, + "unsigned long long": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 8 + }, + "unsigned short": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 2 + } + }, + "enums": {}, + "metadata": { + "format": "6.1.0", + "producer": { + "datetime": "2021-07-31T17:37:28.311608", + "name": "vmextract-by-hand", + "version": "0.0.1" + } + }, + "symbols": { + "revision_id": { + "address": 0, + "constant_data": "MTY=" + } + }, + "user_types": { + "_VMCS": { + "fields": { + "ept": { + "offset": 232, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "executive_vmcs_ptr": { + "offset": 208, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "guest_cr3": { + "offset": 736, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "guest_cr4": { + "offset": 744, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "guest_pdpte": { + "offset": 928, + "type": { + "count": 4, + "kind": "array", + "subtype": { + "kind": "struct", + "name": "unsigned long long" + } + } + }, + "guest_physical_addr": { + "offset": 240, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "host_cr3": { + "offset": 832, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "host_cr4": { + "offset": 840, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "vmcs_link_ptr": { + "offset": 248, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "vpid": { + "offset": 752, + "type": { + "kind": "struct", + "name": "unsigned short" + } + } + }, + "kind": "struct", + "size": 4096 + } + } +} \ No newline at end of file diff --git a/volatility3/symbols/generic/vmcs/westmere-architecture.json b/volatility3/symbols/generic/vmcs/westmere-architecture.json new file mode 100644 index 0000000000..2769f70812 --- /dev/null +++ b/volatility3/symbols/generic/vmcs/westmere-architecture.json @@ -0,0 +1,131 @@ +{ + "base_types": { + "pointer": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 8 + }, + "unsigned char": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 1 + }, + "unsigned long": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 4 + }, + "unsigned long long": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 8 + }, + "unsigned short": { + "endian": "little", + "kind": "int", + "signed": false, + "size": 2 + } + }, + "enums": {}, + "metadata": { + "format": "6.1.0", + "producer": { + "datetime": "2021-07-31T17:37:28.314801", + "name": "vmextract-by-hand", + "version": "0.0.1" + } + }, + "symbols": { + "revision_id": { + "address": 0, + "constant_data": "MTU=" + } + }, + "user_types": { + "_VMCS": { + "fields": { + "ept": { + "offset": 320, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "executive_vmcs_ptr": { + "offset": 208, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "guest_cr3": { + "offset": 736, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "guest_cr4": { + "offset": 744, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "guest_pdpte": { + "offset": 928, + "type": { + "count": 4, + "kind": "array", + "subtype": { + "kind": "struct", + "name": "unsigned long long" + } + } + }, + "guest_physical_addr": { + "offset": 328, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "host_cr3": { + "offset": 832, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "host_cr4": { + "offset": 840, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "vmcs_link_ptr": { + "offset": 248, + "type": { + "kind": "struct", + "name": "unsigned long long" + } + }, + "vpid": { + "offset": 220, + "type": { + "kind": "struct", + "name": "unsigned short" + } + } + }, + "kind": "struct", + "size": 4096 + } + } +} \ No newline at end of file