diff --git a/volatility3/framework/automagic/linux.py b/volatility3/framework/automagic/linux.py index f22cae012..58195744b 100644 --- a/volatility3/framework/automagic/linux.py +++ b/volatility3/framework/automagic/linux.py @@ -7,6 +7,7 @@ from volatility3.framework import constants, interfaces from volatility3.framework.automagic import symbol_cache, symbol_finder +from volatility3.framework.configuration import requirements from volatility3.framework.layers import intel, scanners from volatility3.framework.symbols import linux @@ -61,8 +62,12 @@ def stack( isf_url=isf_path, ) context.symbol_space.append(table) + kaslr_shift, aslr_shift = cls.find_aslr( - context, table_name, layer_name, progress_callback=progress_callback + context, + table_name, + layer_name, + progress_callback=progress_callback, ) if "init_top_pgt" in table.symbols: @@ -111,7 +116,17 @@ def find_aslr( progress_callback: constants.ProgressCallback = None, ) -> Tuple[int, int]: """Determines the offset of the actual DTB in physical space and its - symbol offset.""" + symbol offset. + + Args: + context: The context to retrieve required elements (layers, symbol tables) from + symbol_table: The name of the kernel module on which to operate + layer_name: The layer within the context in which the module exists + progress_callback: A function that takes a percentage (and an optional description) that will be called periodically + + Returns: + kaslr_shirt and aslr_shift + """ init_task_symbol = symbol_table + constants.BANG + "init_task" init_task_json_address = context.symbol_space.get_symbol( init_task_symbol @@ -179,8 +194,8 @@ def find_aslr( vollog.debug("Scanners could not determine any ASLR shifts, using 0 for both") return 0, 0 - @classmethod - def virtual_to_physical_address(cls, addr: int) -> int: + @staticmethod + def virtual_to_physical_address(addr: int) -> int: """Converts a virtual linux address to a physical one (does not account of ASLR)""" if addr > 0xFFFFFFFF80000000: @@ -199,3 +214,212 @@ class LinuxSymbolFinder(symbol_finder.SymbolFinder): @classmethod def find_aslr(cls, *args): return LinuxIntelStacker.find_aslr(*args)[1] + + +class LinuxIntelVMCOREINFOStacker(interfaces.automagic.StackerLayerInterface): + stack_order = 34 + exclusion_list = ["mac", "windows"] + + @staticmethod + def _check_versions() -> bool: + """Verify the versions of the required modules""" + # Check VMCOREINFO API version + vmcoreinfo_version_required = (1, 0, 0) + if not requirements.VersionRequirement.matches_required( + vmcoreinfo_version_required, linux.VMCoreInfo.version + ): + vollog.info( + "VMCOREINFO version not suitable: required %s found %s", + vmcoreinfo_version_required, + linux.VMCoreInfo.version, + ) + return False + + return True + + @classmethod + def stack( + cls, + context: interfaces.context.ContextInterface, + layer_name: str, + progress_callback: constants.ProgressCallback = None, + ) -> Optional[interfaces.layers.DataLayerInterface]: + """Attempts to identify linux within this layer.""" + + # Verify the versions of the required modules + if not cls._check_versions(): + return None + + # Bail out by default unless we can stack properly + layer = context.layers[layer_name] + + # Never stack on top of an intel layer + # FIXME: Find a way to improve this check + if isinstance(layer, intel.Intel): + return None + + linux_banners = symbol_cache.load_cache_manager().get_identifier_dictionary( + operating_system="linux" + ) + if not linux_banners: + # If we have no banners, don't bother scanning + vollog.info( + "No Linux banners found - if this is a linux plugin, please check your " + "symbol files location" + ) + return None + + vmcoreinfo_elf_notes_iter = linux.VMCoreInfo.search_vmcoreinfo_elf_note( + context=context, + layer_name=layer_name, + progress_callback=progress_callback, + ) + + # Iterate through each VMCOREINFO ELF note found, using the first one that is valid. + for _vmcoreinfo_offset, vmcoreinfo in vmcoreinfo_elf_notes_iter: + shifts = cls._vmcoreinfo_find_aslr(vmcoreinfo) + if not shifts: + # Let's try the next VMCOREINFO, in case this one isn't correct. + continue + + kaslr_shift, aslr_shift = shifts + + dtb = cls._vmcoreinfo_get_dtb(vmcoreinfo, aslr_shift, kaslr_shift) + if dtb is None: + # Discard this VMCOREINFO immediately + continue + + is_32bit, is_pae = cls._vmcoreinfo_is_32bit(vmcoreinfo) + if is_32bit: + layer_class = intel.IntelPAE if is_pae else intel.Intel + else: + layer_class = intel.Intel32e + + uts_release = vmcoreinfo["OSRELEASE"] + + # See how linux_banner constant is built in the linux kernel + linux_version_prefix = f"Linux version {uts_release} (".encode() + valid_banners = [ + x for x in linux_banners if x and x.startswith(linux_version_prefix) + ] + if not valid_banners: + # There's no banner matching this VMCOREINFO, keep trying with the next one + continue + elif len(valid_banners) == 1: + # Usually, we narrow down the Linux banner list to a single element. + # Using BytesScanner here is slightly faster than MultiStringScanner. + scanner = scanners.BytesScanner(valid_banners[0]) + else: + scanner = scanners.MultiStringScanner(valid_banners) + + join = interfaces.configuration.path_join + for match in layer.scan( + context=context, scanner=scanner, progress_callback=progress_callback + ): + # Unfortunately, the scanners do not maintain a consistent interface + banner = match[1] if isinstance(match, Tuple) else valid_banners[0] + + isf_path = linux_banners.get(banner, None) + if not isf_path: + vollog.warning( + "Identified banner %r, but no matching ISF is available.", + banner, + ) + continue + + vollog.debug("Identified banner: %r", banner) + table_name = context.symbol_space.free_table_name("LintelStacker") + table = linux.LinuxKernelIntermedSymbols( + context, + f"temporary.{table_name}", + name=table_name, + isf_url=isf_path, + ) + context.symbol_space.append(table) + + # Build the new layer + new_layer_name = context.layers.free_layer_name("primary") + config_path = join("vmcoreinfo", new_layer_name) + kernel_banner = LinuxSymbolFinder.banner_config_key + banner_str = banner.decode(encoding="latin-1") + context.config[join(config_path, kernel_banner)] = banner_str + context.config[join(config_path, "memory_layer")] = layer_name + context.config[join(config_path, "page_map_offset")] = dtb + context.config[join(config_path, "kernel_virtual_offset")] = aslr_shift + layer = layer_class( + context, + config_path=config_path, + name=new_layer_name, + metadata={"os": "Linux"}, + ) + + if layer: + vollog.debug( + "Values found in VMCOREINFO: KASLR=0x%x, ASLR=0x%x, DTB=0x%x", + kaslr_shift, + aslr_shift, + dtb, + ) + + return layer + + vollog.debug("No suitable linux banner could be matched") + return None + + @staticmethod + def _vmcoreinfo_find_aslr(vmcoreinfo) -> Tuple[int, int]: + phys_base = vmcoreinfo.get("NUMBER(phys_base)") + if phys_base is None: + # In kernel < 4.10, there may be a SYMBOL(phys_base), but as noted in the + # c401721ecd1dcb0a428aa5d6832ee05ffbdbffbbe commit comment, this value + # isn't useful for calculating the physical address. + # There's nothing we can do here, so let's try with the next VMCOREINFO or + # the next Stacker. + return None + + # kernels 3.14 (b6085a865762236bb84934161273cdac6dd11c2d) KERNELOFFSET was added + kerneloffset = vmcoreinfo.get("KERNELOFFSET") + if kerneloffset is None: + # kernels < 3.14 if KERNELOFFSET is missing, KASLR might not be implemented. + # Oddly, NUMBER(phys_base) is present without it. To be safe, proceed only + # if both are present. + return None + + aslr_shift = kerneloffset + kaslr_shift = phys_base + aslr_shift + + return kaslr_shift, aslr_shift + + @staticmethod + def _vmcoreinfo_get_dtb(vmcoreinfo, aslr_shift, kaslr_shift) -> int: + """Returns the page global directory physical address (a.k.a DTB or PGD)""" + # In x86-64, since kernels 2.5.22 swapper_pg_dir is a macro to the respective pgd. + # First, in e3ebadd95cb621e2c7436f3d3646447ac9d5c16d to init_level4_pgt, and later + # in kernels 4.13 in 65ade2f872b474fa8a04c2d397783350326634e6) to init_top_pgt. + # In x86-32, the pgd is swapper_pg_dir. So, in any case, for VMCOREINFO + # SYMBOL(swapper_pg_dir) will always have the right value. + dtb_vaddr = vmcoreinfo.get("SYMBOL(swapper_pg_dir)") + if dtb_vaddr is None: + # Abort, it should be present + return None + + dtb_paddr = ( + LinuxIntelStacker.virtual_to_physical_address(dtb_vaddr) + - aslr_shift + + kaslr_shift + ) + + return dtb_paddr + + @staticmethod + def _vmcoreinfo_is_32bit(vmcoreinfo) -> Tuple[bool, bool]: + """Returns a tuple of booleans with is_32bit and is_pae values""" + is_pae = vmcoreinfo.get("CONFIG_X86_PAE", "n") == "y" + if is_pae: + is_32bit = True + else: + # Check the swapper_pg_dir virtual address size + dtb_vaddr = vmcoreinfo["SYMBOL(swapper_pg_dir)"] + is_32bit = dtb_vaddr <= 2**32 + + return is_32bit, is_pae diff --git a/volatility3/framework/constants/linux/__init__.py b/volatility3/framework/constants/linux/__init__.py index 6e49e6f37..e77193479 100644 --- a/volatility3/framework/constants/linux/__init__.py +++ b/volatility3/framework/constants/linux/__init__.py @@ -352,3 +352,9 @@ def flags(self) -> str: MODULE_MAXIMUM_CORE_SIZE = 20000000 MODULE_MAXIMUM_CORE_TEXT_SIZE = 20000000 MODULE_MINIMUM_SIZE = 4096 + +# VMCOREINFO +VMCOREINFO_MAGIC = b"VMCOREINFO\x00" +# Aligned to 4 bytes. See storenote() in kernels < 4.19 or append_kcore_note() in kernels >= 4.19 +VMCOREINFO_MAGIC_ALIGNED = VMCOREINFO_MAGIC + b"\x00" +OSRELEASE_TAG = b"OSRELEASE=" diff --git a/volatility3/framework/plugins/linux/vmcoreinfo.py b/volatility3/framework/plugins/linux/vmcoreinfo.py new file mode 100644 index 000000000..0f07f8589 --- /dev/null +++ b/volatility3/framework/plugins/linux/vmcoreinfo.py @@ -0,0 +1,54 @@ +# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0 +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 +# + +from typing import List + +from volatility3.framework import renderers, interfaces +from volatility3.framework.configuration import requirements +from volatility3.framework.interfaces import plugins +from volatility3.framework.symbols import linux +from volatility3.framework.renderers import format_hints + + +class VMCoreInfo(plugins.PluginInterface): + """Enumerate VMCoreInfo tables""" + + _required_framework_version = (2, 11, 0) + _version = (1, 0, 0) + + @classmethod + def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: + return [ + requirements.TranslationLayerRequirement( + name="primary", description="Memory layer to scan" + ), + requirements.VersionRequirement( + name="VMCoreInfo", component=linux.VMCoreInfo, version=(1, 0, 0) + ), + ] + + def _generator(self): + layer_name = self.config["primary"] + for ( + vmcoreinfo_offset, + vmcoreinfo, + ) in linux.VMCoreInfo.search_vmcoreinfo_elf_note( + context=self.context, + layer_name=layer_name, + ): + for key, value in vmcoreinfo.items(): + if key.startswith("SYMBOL(") or key == "KERNELOFFSET": + value = f"0x{value:x}" + else: + value = str(value) + + yield 0, (format_hints.Hex(vmcoreinfo_offset), key, value) + + def run(self): + headers = [ + ("Offset", format_hints.Hex), + ("Key", str), + ("Value", str), + ] + return renderers.TreeGrid(headers, self._generator()) diff --git a/volatility3/framework/symbols/linux/__init__.py b/volatility3/framework/symbols/linux/__init__.py index 0230a9c48..11eaaeddd 100644 --- a/volatility3/framework/symbols/linux/__init__.py +++ b/volatility3/framework/symbols/linux/__init__.py @@ -2,15 +2,18 @@ # which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 # import math +import string import contextlib from abc import ABC, abstractmethod -from typing import Iterator, List, Tuple, Optional, Union +from typing import Iterator, List, Tuple, Optional, Union, Dict from volatility3 import framework from volatility3.framework import constants, exceptions, interfaces, objects from volatility3.framework.objects import utility from volatility3.framework.symbols import intermed from volatility3.framework.symbols.linux import extensions +from volatility3.framework.layers import scanners +from volatility3.framework.constants import linux as linux_constants class LinuxKernelIntermedSymbols(intermed.IntermediateSymbolTable): @@ -830,3 +833,113 @@ def get_cached_pages(self) -> Iterator[interfaces.objects.ObjectInterface]: page = self.vmlinux.object("page", offset=page_addr, absolute=True) if page: yield page + + +class VMCoreInfo(interfaces.configuration.VersionableInterface): + _required_framework_version = (2, 11, 0) + + _version = (1, 0, 0) + + @classmethod + def _vmcoreinfo_data_to_dict( + cls, + vmcoreinfo_data, + ) -> Optional[Dict[str, str]]: + """Converts the input VMCoreInfo data buffer into a dictionary""" + + # Ensure the whole buffer is printable + if not all(c in string.printable.encode() for c in vmcoreinfo_data): + # Abort, we are in the wrong place + return None + + vmcoreinfo_dict = dict() + for line in vmcoreinfo_data.decode().splitlines(): + if not line: + break + + key, value = line.split("=", 1) + vmcoreinfo_dict[key] = cls._parse_value(key, value) + + return vmcoreinfo_dict + + @classmethod + def _parse_value(cls, key, value): + if key.startswith("SYMBOL(") or key == "KERNELOFFSET": + return int(value, 16) + elif key.startswith(("NUMBER(", "LENGTH(", "SIZE(", "OFFSET(")): + return int(value, 0) + elif key == "PAGESIZE": + return int(value, 0) + + # Default, as string + return value + + @classmethod + def search_vmcoreinfo_elf_note( + cls, + context: interfaces.context.ContextInterface, + layer_name: str, + progress_callback: constants.ProgressCallback = None, + ) -> Iterator[Tuple[int, Dict[str, str]]]: + """Enumerates each VMCoreInfo ELF note table found in memory along with its offset. + + This approach is independent of any external ISF symbol or type, requiring only the + Elf64_Note found in 'elf.json', which is already included in the framework. + + Args: + context: The context to retrieve required elements (layers, symbol tables) from + layer_name: The layer within the context in which the module exists + progress_callback: A function that takes a percentage (and an optional description) that will be called periodically + + Yields: + Tuples with the VMCoreInfo ELF note offset and the VMCoreInfo table parsed in a dictionary. + """ + + elf_table_name = intermed.IntermediateSymbolTable.create( + context, "elf_symbol_table", "linux", "elf" + ) + module = context.module(elf_table_name, layer_name, 0) + layer = context.layers[layer_name] + + # Both Elf32_Note and Elf64_Note are of the same size + elf_note_size = context.symbol_space[elf_table_name].get_type("Elf64_Note").size + + for vmcoreinfo_offset in layer.scan( + scanner=scanners.BytesScanner(linux_constants.VMCOREINFO_MAGIC_ALIGNED), + context=context, + progress_callback=progress_callback, + ): + # vmcoreinfo_note kernels >= 2.6.24 fd59d231f81cb02870b9cf15f456a897f3669b4e + vmcoreinfo_elf_note_offset = vmcoreinfo_offset - elf_note_size + + # Elf32_Note and Elf64_Note are identical, so either can be used interchangeably here + elf_note = module.object( + object_type="Elf64_Note", + offset=vmcoreinfo_elf_note_offset, + absolute=True, + ) + + # Ensure that we are within an ELF note + if ( + elf_note.n_namesz != len(linux_constants.VMCOREINFO_MAGIC) + or elf_note.n_type != 0 + or elf_note.n_descsz == 0 + ): + continue + + vmcoreinfo_data_offset = vmcoreinfo_offset + len( + linux_constants.VMCOREINFO_MAGIC_ALIGNED + ) + + # Also, confirm this with the first tag, which has consistently been OSRELEASE + vmcoreinfo_data = layer.read(vmcoreinfo_data_offset, elf_note.n_descsz) + if not vmcoreinfo_data.startswith(linux_constants.OSRELEASE_TAG): + continue + + table = cls._vmcoreinfo_data_to_dict(vmcoreinfo_data) + if not table: + # Wrong VMCoreInfo note offset, keep trying + continue + + # A valid VMCoreInfo ELF note exists at 'vmcoreinfo_elf_note_offset' + yield vmcoreinfo_elf_note_offset, table