Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Linux: Address limitations in determining KASLR shifts by introducing VMCoreInfo support #1332

Open
wants to merge 20 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
512c1ab
linux: VMCoreInfo plugin: Add VMCoreInfo API and its respective plugin
gcmoreira Nov 4, 2024
b4b7834
linux: K/ASLR via VMCoreInfo
gcmoreira Nov 4, 2024
6610779
linux: VMCoreInfo: Remove debug comment
gcmoreira Nov 4, 2024
5ab9756
linux: intel VMCoreInfo: Move to its own stacker...
gcmoreira Nov 14, 2024
00b528a
linux: intel VMCoreInfo: Fix docstring typo
gcmoreira Nov 14, 2024
41a9347
linux: intel VMCoreInfo: Improve layer config code
gcmoreira Nov 14, 2024
4c0ddca
linux: intel VMCoreInfo: BugFix. Return layer if both values are present
gcmoreira Nov 18, 2024
4a88a74
linux: VMCoreInfo API: Integrate support for value parsing within the…
gcmoreira Nov 18, 2024
f2a2f8a
linux: intel VMCoreInfo: immediately abort processing the current VMC…
gcmoreira Nov 18, 2024
d80ef55
linux: intel VMCoreInfo: Select the fastest scanner for each scenario
gcmoreira Nov 18, 2024
13729ac
linux: VMCoreInfo API: AARCH64 uses NUMBER() with hex values. Fortuna…
gcmoreira Nov 18, 2024
96f5fda
linux: intel VMCoreInfo: Add the vmcoreinfo to the layer metadata
gcmoreira Nov 19, 2024
9f6d194
linux: intel VMCoreInfo: The _direct_metadata class attribute needs t…
gcmoreira Nov 19, 2024
41d2e92
linux: intel VMCoreInfo: Revert changes associated with adding vmcore…
gcmoreira Nov 20, 2024
bbd2f9b
linux: intel VMCoreInfo: Fix bug introduced in last commit reverting …
gcmoreira Nov 22, 2024
03aba8d
Merge branch 'develop' into linux_vmcoreinfo_aslr_and_plugin
gcmoreira Dec 19, 2024
d14c777
linux: vmcoreinfo layer: Update to use the new cache-manager
gcmoreira Dec 19, 2024
d56f3a5
linux: vmcoreinfo layer: Add review suggestions
gcmoreira Dec 19, 2024
7858af3
linux: vmcoreinfo layer: remove unused import
gcmoreira Dec 19, 2024
7ed8b99
linux: vmcoreinfo layer: Use the version class accessor instead of th…
gcmoreira Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 228 additions & 4 deletions volatility3/framework/automagic/linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from volatility3.framework import constants, interfaces
from volatility3.framework.automagic import symbol_cache, symbol_finder
from volatility3.framework.configuration import requirements
from volatility3.framework.layers import intel, scanners
from volatility3.framework.symbols import linux

Expand Down Expand Up @@ -61,8 +62,12 @@ def stack(
isf_url=isf_path,
)
context.symbol_space.append(table)

kaslr_shift, aslr_shift = cls.find_aslr(
context, table_name, layer_name, progress_callback=progress_callback
context,
table_name,
layer_name,
progress_callback=progress_callback,
)

if "init_top_pgt" in table.symbols:
Expand Down Expand Up @@ -111,7 +116,17 @@ def find_aslr(
progress_callback: constants.ProgressCallback = None,
) -> Tuple[int, int]:
"""Determines the offset of the actual DTB in physical space and its
symbol offset."""
symbol offset.

Args:
context: The context to retrieve required elements (layers, symbol tables) from
symbol_table: The name of the kernel module on which to operate
layer_name: The layer within the context in which the module exists
progress_callback: A function that takes a percentage (and an optional description) that will be called periodically

Returns:
kaslr_shirt and aslr_shift
"""
init_task_symbol = symbol_table + constants.BANG + "init_task"
init_task_json_address = context.symbol_space.get_symbol(
init_task_symbol
Expand Down Expand Up @@ -179,8 +194,8 @@ def find_aslr(
vollog.debug("Scanners could not determine any ASLR shifts, using 0 for both")
return 0, 0

@classmethod
def virtual_to_physical_address(cls, addr: int) -> int:
@staticmethod
def virtual_to_physical_address(addr: int) -> int:
"""Converts a virtual linux address to a physical one (does not account
of ASLR)"""
if addr > 0xFFFFFFFF80000000:
Expand All @@ -199,3 +214,212 @@ class LinuxSymbolFinder(symbol_finder.SymbolFinder):
@classmethod
def find_aslr(cls, *args):
return LinuxIntelStacker.find_aslr(*args)[1]


class LinuxIntelVMCOREINFOStacker(interfaces.automagic.StackerLayerInterface):
stack_order = 34
exclusion_list = ["mac", "windows"]

@staticmethod
def _check_versions() -> bool:
"""Verify the versions of the required modules"""
# Check VMCOREINFO API version
vmcoreinfo_version_required = (1, 0, 0)
if not requirements.VersionRequirement.matches_required(
vmcoreinfo_version_required, linux.VMCoreInfo.version
):
vollog.info(
"VMCOREINFO version not suitable: required %s found %s",
vmcoreinfo_version_required,
linux.VMCoreInfo.version,
)
return False

return True

@classmethod
def stack(
cls,
context: interfaces.context.ContextInterface,
layer_name: str,
progress_callback: constants.ProgressCallback = None,
) -> Optional[interfaces.layers.DataLayerInterface]:
"""Attempts to identify linux within this layer."""

# Verify the versions of the required modules
if not cls._check_versions():
return None

# Bail out by default unless we can stack properly
layer = context.layers[layer_name]

# Never stack on top of an intel layer
# FIXME: Find a way to improve this check
if isinstance(layer, intel.Intel):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this will go wrong for virtualized hosts. We're still a little way off supporting them yet, but definitely one not to forget about...

return None

linux_banners = symbol_cache.load_cache_manager().get_identifier_dictionary(
operating_system="linux"
)
if not linux_banners:
# If we have no banners, don't bother scanning
vollog.info(
"No Linux banners found - if this is a linux plugin, please check your "
"symbol files location"
)
return None

vmcoreinfo_elf_notes_iter = linux.VMCoreInfo.search_vmcoreinfo_elf_note(
context=context,
layer_name=layer_name,
progress_callback=progress_callback,
)

# Iterate through each VMCOREINFO ELF note found, using the first one that is valid.
for _vmcoreinfo_offset, vmcoreinfo in vmcoreinfo_elf_notes_iter:
shifts = cls._vmcoreinfo_find_aslr(vmcoreinfo)
if not shifts:
# Let's try the next VMCOREINFO, in case this one isn't correct.
continue

kaslr_shift, aslr_shift = shifts

dtb = cls._vmcoreinfo_get_dtb(vmcoreinfo, aslr_shift, kaslr_shift)
if dtb is None:
# Discard this VMCOREINFO immediately
continue

is_32bit, is_pae = cls._vmcoreinfo_is_32bit(vmcoreinfo)
if is_32bit:
layer_class = intel.IntelPAE if is_pae else intel.Intel
else:
layer_class = intel.Intel32e

uts_release = vmcoreinfo["OSRELEASE"]

# See how linux_banner constant is built in the linux kernel
linux_version_prefix = f"Linux version {uts_release} (".encode()
valid_banners = [
x for x in linux_banners if x and x.startswith(linux_version_prefix)
]
if not valid_banners:
# There's no banner matching this VMCOREINFO, keep trying with the next one
continue
elif len(valid_banners) == 1:
# Usually, we narrow down the Linux banner list to a single element.
# Using BytesScanner here is slightly faster than MultiStringScanner.
scanner = scanners.BytesScanner(valid_banners[0])
else:
scanner = scanners.MultiStringScanner(valid_banners)

join = interfaces.configuration.path_join
for match in layer.scan(
context=context, scanner=scanner, progress_callback=progress_callback
):
# Unfortunately, the scanners do not maintain a consistent interface
banner = match[1] if isinstance(match, Tuple) else valid_banners[0]
gcmoreira marked this conversation as resolved.
Show resolved Hide resolved

isf_path = linux_banners.get(banner, None)
if not isf_path:
vollog.warning(
"Identified banner %r, but no matching ISF is available.",
banner,
)
continue

vollog.debug("Identified banner: %r", banner)
table_name = context.symbol_space.free_table_name("LintelStacker")
table = linux.LinuxKernelIntermedSymbols(
context,
f"temporary.{table_name}",
name=table_name,
isf_url=isf_path,
)
context.symbol_space.append(table)

# Build the new layer
new_layer_name = context.layers.free_layer_name("primary")
config_path = join("vmcoreinfo", new_layer_name)
kernel_banner = LinuxSymbolFinder.banner_config_key
banner_str = banner.decode(encoding="latin-1")
context.config[join(config_path, kernel_banner)] = banner_str
context.config[join(config_path, "memory_layer")] = layer_name
context.config[join(config_path, "page_map_offset")] = dtb
context.config[join(config_path, "kernel_virtual_offset")] = aslr_shift
layer = layer_class(
context,
config_path=config_path,
name=new_layer_name,
metadata={"os": "Linux"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had you wanted to pass the vmcore through the metadata? If so, here's where you'd do it, I guess? Don't remember if we mind metadata being complex types...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch that, I just read back through the old comments and know why that won't work.. We probably should pull the metadata across from the constructed layer after stacking (although loading it from a config won't pull it across, so it might still require a mechanism to repopulate somehow)...

)

if layer:
vollog.debug(
"Values found in VMCOREINFO: KASLR=0x%x, ASLR=0x%x, DTB=0x%x",
kaslr_shift,
aslr_shift,
dtb,
)

return layer

vollog.debug("No suitable linux banner could be matched")
return None

@staticmethod
def _vmcoreinfo_find_aslr(vmcoreinfo) -> Tuple[int, int]:
phys_base = vmcoreinfo.get("NUMBER(phys_base)")
if phys_base is None:
# In kernel < 4.10, there may be a SYMBOL(phys_base), but as noted in the
# c401721ecd1dcb0a428aa5d6832ee05ffbdbffbbe commit comment, this value
# isn't useful for calculating the physical address.
# There's nothing we can do here, so let's try with the next VMCOREINFO or
# the next Stacker.
return None

# kernels 3.14 (b6085a865762236bb84934161273cdac6dd11c2d) KERNELOFFSET was added
kerneloffset = vmcoreinfo.get("KERNELOFFSET")
if kerneloffset is None:
# kernels < 3.14 if KERNELOFFSET is missing, KASLR might not be implemented.
# Oddly, NUMBER(phys_base) is present without it. To be safe, proceed only
# if both are present.
return None

aslr_shift = kerneloffset
kaslr_shift = phys_base + aslr_shift

return kaslr_shift, aslr_shift

@staticmethod
def _vmcoreinfo_get_dtb(vmcoreinfo, aslr_shift, kaslr_shift) -> int:
"""Returns the page global directory physical address (a.k.a DTB or PGD)"""
# In x86-64, since kernels 2.5.22 swapper_pg_dir is a macro to the respective pgd.
# First, in e3ebadd95cb621e2c7436f3d3646447ac9d5c16d to init_level4_pgt, and later
# in kernels 4.13 in 65ade2f872b474fa8a04c2d397783350326634e6) to init_top_pgt.
# In x86-32, the pgd is swapper_pg_dir. So, in any case, for VMCOREINFO
# SYMBOL(swapper_pg_dir) will always have the right value.
dtb_vaddr = vmcoreinfo.get("SYMBOL(swapper_pg_dir)")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we use this twice, should it not be a property or something?

if dtb_vaddr is None:
# Abort, it should be present
return None

dtb_paddr = (
LinuxIntelStacker.virtual_to_physical_address(dtb_vaddr)
- aslr_shift
+ kaslr_shift
)

return dtb_paddr

@staticmethod
def _vmcoreinfo_is_32bit(vmcoreinfo) -> Tuple[bool, bool]:
"""Returns a tuple of booleans with is_32bit and is_pae values"""
is_pae = vmcoreinfo.get("CONFIG_X86_PAE", "n") == "y"
if is_pae:
is_32bit = True
else:
# Check the swapper_pg_dir virtual address size
dtb_vaddr = vmcoreinfo["SYMBOL(swapper_pg_dir)"]
is_32bit = dtb_vaddr <= 2**32

return is_32bit, is_pae
6 changes: 6 additions & 0 deletions volatility3/framework/constants/linux/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,3 +352,9 @@ def flags(self) -> str:
MODULE_MAXIMUM_CORE_SIZE = 20000000
MODULE_MAXIMUM_CORE_TEXT_SIZE = 20000000
MODULE_MINIMUM_SIZE = 4096

# VMCOREINFO
VMCOREINFO_MAGIC = b"VMCOREINFO\x00"
# Aligned to 4 bytes. See storenote() in kernels < 4.19 or append_kcore_note() in kernels >= 4.19
VMCOREINFO_MAGIC_ALIGNED = VMCOREINFO_MAGIC + b"\x00"
OSRELEASE_TAG = b"OSRELEASE="
54 changes: 54 additions & 0 deletions volatility3/framework/plugins/linux/vmcoreinfo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

from typing import List

from volatility3.framework import renderers, interfaces
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.symbols import linux
from volatility3.framework.renderers import format_hints


class VMCoreInfo(plugins.PluginInterface):
"""Enumerate VMCoreInfo tables"""

_required_framework_version = (2, 11, 0)
_version = (1, 0, 0)

@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.TranslationLayerRequirement(
name="primary", description="Memory layer to scan"
),
requirements.VersionRequirement(
name="VMCoreInfo", component=linux.VMCoreInfo, version=(1, 0, 0)
),
]

def _generator(self):
layer_name = self.config["primary"]
for (
vmcoreinfo_offset,
vmcoreinfo,
) in linux.VMCoreInfo.search_vmcoreinfo_elf_note(
context=self.context,
layer_name=layer_name,
):
for key, value in vmcoreinfo.items():
if key.startswith("SYMBOL(") or key == "KERNELOFFSET":
value = f"0x{value:x}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this is the same as f"{value:#x}"?

else:
value = str(value)

yield 0, (format_hints.Hex(vmcoreinfo_offset), key, value)

def run(self):
headers = [
("Offset", format_hints.Hex),
("Key", str),
("Value", str),
]
return renderers.TreeGrid(headers, self._generator())
Loading
Loading