Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better pagefault handeling - with swap file support #778

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
222 changes: 137 additions & 85 deletions volatility3/framework/layers/intel.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def __init__(self,
self._base_layer = self.config["memory_layer"]
self._swap_layers: List[str] = []
self._page_map_offset = self.config["page_map_offset"]
self._kernel: Optional[interfaces.context.ModuleInterface] = None

# Assign constants
self._initial_position = min(self._maxvirtaddr, self._bits_per_register) - 1
Expand All @@ -50,6 +51,7 @@ def __init__(self,

# These can vary depending on the type of space
self._index_shift = int(math.ceil(math.log2(struct.calcsize(self._entry_format))))
self._structure_position_table: Dict[int, Tuple[str, int, bool]] = {}

@classproperty
@functools.lru_cache()
Expand Down Expand Up @@ -102,70 +104,84 @@ def _translate(self, offset: int) -> Tuple[int, int, str]:
translated address lives in and the layer_name that the address
lives in
"""
entry, position = self._translate_entry(offset)
# Setup the entry and how far we are through the offset
# Position maintains the number of bits left to process
# We or with 0x1 to ensure our page_map_offset is always valid
position = self._initial_position
entry = self._initial_entry

if self.minimum_address > offset > self.maximum_address:
raise exceptions.PagedInvalidAddressException(self.name, offset, position + 1, entry,
"Entry outside virtual address range: " + hex(entry))

entry, position = self._translate_entry(offset, position, entry)

# Now we're done
if not self._page_is_valid(entry):
raise exceptions.PagedInvalidAddressException(self.name, offset, position + 1, entry,
f"Page Fault at entry {hex(entry)} in page entry")
return self._handle_page_fault(self.name, offset, position + 1, entry, f"Page Fault at entry {hex(entry)} in page entry")

page = self._mask(entry, self._maxphyaddr - 1, position + 1) | self._mask(offset, position, 0)

return page, 1 << (position + 1), self._base_layer

def _translate_entry(self, offset: int) -> Tuple[int, int]:
def _find_structure_index(self, position: int) -> Tuple[str, int, bool]:
if not self._structure_position_table:
counter = self._initial_position
for name, size, large_page in self._structure:
self._structure_position_table[counter] = name, size, large_page
counter -= size
# print(self._structure_position_table)
paulkermann marked this conversation as resolved.
Show resolved Hide resolved
return self._structure_position_table[position]

def _handle_page_fault(self, name, offset, invalid_bits, entry, description):
"""Handles page faults"""
raise exceptions.PagedInvalidAddressException(self.name, offset, invalid_bits, entry,
"Page Fault at entry " + hex(entry) + " in table " + name)

def _translate_entry(self, offset: int, position: int, entry: int) -> Tuple[int, int]:
"""Translates a specific offset based on paging tables.

Returns the translated entry value
"""
# Setup the entry and how far we are through the offset
# Position maintains the number of bits left to process
# We or with 0x1 to ensure our page_map_offset is always valid
position = self._initial_position
entry = self._initial_entry

if self.minimum_address > offset > self.maximum_address:
name, size, large_page = self._find_structure_index(position)

# Check we're present
if not entry & 0x1:
return self._handle_page_fault(self.name, offset, position + 1, entry,
"Page Fault at entry " + hex(entry) + " in table " + name)
# Check if we're a large page
if large_page and (entry & (1 << 7)):
# We're a large page, the rest is finished below
# If we want to implement PSE-36, it would need to be done here
return entry, position
# Figure out how much of the offset we should be using
start = position
position -= size
index = self._mask(offset, start, position + 1) >> (position + 1)

# Grab the base address of the table we'll be getting the next entry from
base_address = self._mask(entry, self._maxphyaddr - 1, size + self._index_shift)

table = self._get_valid_table(base_address)
if table is None:
raise exceptions.PagedInvalidAddressException(self.name, offset, position + 1, entry,
paulkermann marked this conversation as resolved.
Show resolved Hide resolved
"Entry outside virtual address range: " + hex(entry))
"Page Fault at entry " + hex(entry) + " in table " + name)

# Read the data for the next entry
entry_data = table[(index << self._index_shift):(index << self._index_shift) + self._entry_size]

if INTEL_TRANSLATION_DEBUGGING:
vollog.log(
constants.LOGLEVEL_VVVV, "Entry {} at index {} gives data {} as {}".format(
hex(entry), hex(index), hex(struct.unpack(self._entry_format, entry_data)[0]), name))

# Run through the offset in various chunks
for (name, size, large_page) in self._structure:
# Check we're valid
if not self._page_is_valid(entry):
raise exceptions.PagedInvalidAddressException(self.name, offset, position + 1, entry,
"Page Fault at entry " + hex(entry) + " in table " + name)
# Check if we're a large page
if large_page and (entry & (1 << 7)):
# Mask off the PAT bit
if entry & (1 << 12):
entry -= (1 << 12)
# We're a large page, the rest is finished below
# If we want to implement PSE-36, it would need to be done here
break
# Figure out how much of the offset we should be using
start = position
position -= size
index = self._mask(offset, start, position + 1) >> (position + 1)

# Grab the base address of the table we'll be getting the next entry from
base_address = self._mask(entry, self._maxphyaddr - 1, size + self._index_shift)

table = self._get_valid_table(base_address)
if table is None:
raise exceptions.PagedInvalidAddressException(self.name, offset, position + 1, entry,
"Page Fault at entry " + hex(entry) + " in table " + name)

# Read the data for the next entry
entry_data = table[(index << self._index_shift):(index << self._index_shift) + self._entry_size]

if INTEL_TRANSLATION_DEBUGGING:
vollog.log(
constants.LOGLEVEL_VVVV, "Entry {} at index {} gives data {} as {}".format(
hex(entry), hex(index), hex(struct.unpack(self._entry_format, entry_data)[0]), name))

# Read out the new entry from memory
entry, = struct.unpack(self._entry_format, entry_data)

return entry, position
# Read out the new entry from memory
entry, = struct.unpack(self._entry_format, entry_data)

if position < self._page_size_in_bits:
return entry, position

return self._translate_entry(offset, position, entry)

@functools.lru_cache(1025)
def _get_valid_table(self, base_address: int) -> Optional[bytes]:
Expand Down Expand Up @@ -307,6 +323,47 @@ class Intel32e(Intel):


class WindowsMixin(Intel):
_swap_bit_offset = 32
def __init__(self,
context: interfaces.context.ContextInterface,
config_path: str,
name: str,
metadata: Optional[Dict[str, Any]] = None) -> None:
super().__init__(context = context, config_path = config_path, name = name, metadata = metadata)
self._kernel: Optional[interfaces.context.ModuleInterface] = self._get_kernel_module()

def _get_kernel_module(self) -> Optional[interfaces.context.ModuleInterface]:
kvo = self.config.get('kernel_virtual_offset', None)
if kvo is None:
return None

for module_name in self.context.modules:
if self.context.modules[module_name].offset == kvo:
return self.context.modules[module_name]

return None

@functools.lru_cache()
def _get_invalid_pte_mask(self, kernel):
if kernel.has_symbol("MiInvalidPteMask"):
pte_size = kernel.get_type("_MMPTE_HARDWARE").vol.size
pte_type = "unsigned int"
if pte_size == 8:
pte_type = "unsigned long long"

return kernel.object(pte_type, offset=kernel.get_symbol("MiInvalidPteMask").address)

if kernel.has_symbol("MiState"):
system_information = kernel.object("_MI_SYSTEM_INFORMATION", offset=kernel.get_symbol("MiState").address)
return system_information.Hardware.InvalidPteMask

return 0
paulkermann marked this conversation as resolved.
Show resolved Hide resolved

@functools.lru_cache()
def _get_PageFileLow_shift(self, kernel):
mmpte_software_type = kernel.get_type("_MMPTE_SOFTWARE")

return mmpte_software_type.vol.members["PageFileLow"][1].start_bit
paulkermann marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def _page_is_valid(entry: int) -> bool:
Expand All @@ -321,53 +378,48 @@ def _page_is_valid(entry: int) -> bool:
"""
return bool((entry & 1) or ((entry & 1 << 11) and not entry & 1 << 10))

def _translate_swap(self, layer: Intel, offset: int, bit_offset: int) -> Tuple[int, int, str]:
try:
return super()._translate(offset)
except exceptions.PagedInvalidAddressException as excp:
entry = excp.entry
tbit = bool(entry & (1 << 11))
pbit = bool(entry & (1 << 10))
unknown_bit = bool(entry & (1 << 7))
n = (entry >> 1) & 0xF
vbit = bool(entry & 1)
if (not tbit and not pbit and not vbit and unknown_bit) and ((entry >> bit_offset) != 0):
swap_offset = entry >> bit_offset << excp.invalid_bits

if layer.config.get('swap_layers', False):
swap_layer_name = layer.config.get(
interfaces.configuration.path_join('swap_layers', 'swap_layers' + str(n)), None)
if swap_layer_name:
return swap_offset, 1 << excp.invalid_bits, swap_layer_name
raise exceptions.SwappedInvalidAddressException(layer_name = excp.layer_name,
invalid_address = excp.invalid_address,
invalid_bits = excp.invalid_bits,
entry = excp.entry,
swap_offset = swap_offset)
raise
def _handle_page_fault(self, layer_name, offset, invalid_bits, entry, description):
paulkermann marked this conversation as resolved.
Show resolved Hide resolved
kernel = self._kernel
if kernel is None:
raise exceptions.PagedInvalidAddressException(self.name, offset, invalid_bits, entry, "kernel module not found!")

tbit = bool(entry & (1 << 11))
pbit = bool(entry & (1 << 10))
vbit = bool(entry & 1)
entry ^= self._get_invalid_pte_mask(kernel)

# Handle Swap failure
if (not tbit and not pbit and not vbit) and ((entry >> self._swap_bit_offset) != 0):
swap_offset = entry >> self._swap_bit_offset << invalid_bits
n = (entry >> self._get_PageFileLow_shift(kernel)) & 0xF

if self.config.get('swap_layers', False):
swap_layer_name = self.config.get(
interfaces.configuration.path_join('swap_layers', 'swap_layers' + str(n)), None)
if swap_layer_name:
return swap_offset, 1 << invalid_bits, swap_layer_name
raise exceptions.SwappedInvalidAddressException(layer_name = layer_name,
invalid_address = offset,
invalid_bits = invalid_bits,
entry = entry,
swap_offset = swap_offset)
raise super()._handle_page_fault(layer_name, offset, invalid_bits, entry, description)


### These must be full separate classes so that JSON configs re-create them properly


class WindowsIntel(WindowsMixin, Intel):

def _translate(self, offset):
return self._translate_swap(self, offset, self._page_size_in_bits)
pass


class WindowsIntelPAE(WindowsMixin, IntelPAE):

def _translate(self, offset: int) -> Tuple[int, int, str]:
return self._translate_swap(self, offset, self._bits_per_register)
pass


class WindowsIntel32e(WindowsMixin, Intel32e):
# TODO: Fix appropriately in a future release.
# Currently just a temporary workaround to deal with custom bit flag
# Currently just a temprorary workaround to deal with custom bit flag
paulkermann marked this conversation as resolved.
Show resolved Hide resolved
# in the PFN field for pages in transition state.
# See https://github.com/volatilityfoundation/volatility3/pull/475
_maxphyaddr = 45

def _translate(self, offset: int) -> Tuple[int, int, str]:
return self._translate_swap(self, offset, self._bits_per_register // 2)