Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added handling of prototype and VAD hardware / unknown PTEs on Windows x64 #1073

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 131 additions & 2 deletions volatility3/framework/layers/intel.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,9 @@ class Intel32e(Intel):


class WindowsMixin(Intel):

_vad_root = None

@staticmethod
def _page_is_valid(entry: int) -> bool:
"""Returns whether a particular page is valid based on its entry.
Expand All @@ -443,15 +446,33 @@ def _page_is_valid(entry: int) -> bool:
"""
return bool((entry & 1) or ((entry & 1 << 11) and not entry & 1 << 10))

def _is_prototype_pte(self, entry: int) -> bool:
return bool((entry & (1 << 10)) and (not super()._page_is_valid(entry)))

def _is_transition_pte(self, entry: int) -> bool:
return bool((entry & (1 << 11)) and (not super()._page_is_valid(entry)))

def _process_transition_pte(self, pte: int, offset: int) -> Tuple[int, int, str]:
"""Resolves a Transition PTE"""
return None

def _process_prototype_pte(self, pte: int, offset: int) -> Tuple[int, int, str]:
"""Resolves a Prototype PTE"""
return None

def _process_vad_hardware_pte(self, pte: int, offset: int) -> Tuple[int, int, str]:
"""Resolves a VAD Hardware / Unknown PTE"""
return None

def _translate_swap(
self, layer: Intel, offset: int, bit_offset: int
) -> Tuple[int, int, str]:
try:
return super()._translate(offset)
except exceptions.PagedInvalidAddressException as excp:
entry = excp.entry
tbit = bool(entry & (1 << 11))
pbit = bool(entry & (1 << 10))
tbit = self._is_transition_pte(entry)
pbit = self._is_prototype_pte(entry)
unknown_bit = bool(entry & (1 << 7))
n = (entry >> 1) & 0xF
vbit = bool(entry & 1)
Expand All @@ -476,6 +497,22 @@ def _translate_swap(
entry=excp.entry,
swap_offset=swap_offset,
)
if isinstance(self, WindowsIntel32e):
# Handle Prototype PTEs
try:
result = self._process_prototype_pte(entry, offset)
if result is not None:
return result
except:
raise
# Handle VAD hardware / unknown PTEs
if entry == 0:
try:
result = self._process_vad_hardware_pte(entry, offset)
if result is not None:
return result
except:
raise
raise


Expand All @@ -499,5 +536,97 @@ class WindowsIntel32e(WindowsMixin, Intel32e):
# See https://github.com/volatilityfoundation/volatility3/pull/475
_maxphyaddr = 45

def _process_transition_pte(self, pte: int, offset: int) -> Tuple[int, int, str]:
"""Resolves a Transition PTE"""
# Check if it's a transition PTE
if not self._is_transition_pte(pte):
return None

# _MMPTE_TRANSITION::PageFrameNumber
page = self._mask(pte, self._maxphyaddr - 1, self._page_size_in_bits) | \
self._mask(offset, self._page_size_in_bits - 1, 0)

return page, 1 << self._page_size_in_bits, self._base_layer

def _process_prototype_pte(self, pte: int, offset: int) -> Tuple[int, int, str]:
"""Resolves a Prototype PTE"""
# Check if it's a prototype PTE
if not self._is_prototype_pte(pte):
return None

# _MMPTE_PROTOTYPE::ProtoAddress
proto_addr = self.canonicalize(pte >> 16)

if proto_addr == 0xffffffffffff0000:
return self._process_vad_hardware_pte(proto_addr, offset)

# Translate the prototype address
pte_addr_pa = super()._translate(proto_addr)

# Read and unpack the PTE
entry = self._context.layers.read(self._base_layer, pte_addr_pa[0],
self._bits_per_register // 8)

(entry, ) = struct.unpack(self._entry_format, entry)

# Check if the entry is valid
if super()._page_is_valid(entry):

# _MMPTE_HARDWARE::PageFrameNumber
page = self._mask(entry, self._maxphyaddr - 1, self._page_size_in_bits) | \
self._mask(offset, self._page_size_in_bits - 1, 0)

return page, 1 << self._page_size_in_bits, self._base_layer

return None

def _process_vad_hardware_pte(self, pte: int, offset: int) -> Tuple[int, int, str]:
"""Resolves a VAD hardware / unknown PTE"""
# Check if it's a potential VAD hardware PTE
if super()._page_is_valid(pte) or \
self._is_prototype_pte(pte) or \
self._is_transition_pte(pte):
return None

if self._vad_root is None:
return None

# Walk the VAD
# TODO: Make the lookup more efficient
for vad in self._vad_root.traverse():
if (offset < vad.get_start()) or (offset > vad.get_end()):
continue

if (not vad.has_member("FirstPrototypePte")) or \
(not vad.has_member("LastContiguousPte")):
continue

pte_ofs = ((offset - vad.get_start()) >> self._page_size_in_bits) * \
(self._bits_per_register // 8)

if pte_ofs > (vad.LastContiguousPte - vad.FirstPrototypePte):
continue

pte_addr = self.canonicalize(vad.FirstPrototypePte + pte_ofs)
pte_addr_pa = super()._translate(pte_addr)

# Read and unpack the PTE
entry = self._context.layers.read(self._base_layer, pte_addr_pa[0],
self._bits_per_register // 8)

(pte, ) = struct.unpack(self._entry_format, entry)

# Handle Transition PTEs
if self._is_transition_pte(pte):
return self._process_transition_pte(pte, offset)

# Handle Prototype PTEs
if self._is_prototype_pte(pte):
return self._process_prototype_pte(pte, offset)

break

return None

def _translate(self, offset: int) -> Tuple[int, int, str]:
return self._translate_swap(self, offset, self._bits_per_register // 2)
6 changes: 5 additions & 1 deletion volatility3/framework/symbols/windows/extensions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,13 @@ def add_process_layer(self, config_prefix: str = None, preferred_name: str = Non
preferred_name = self.vol.layer_name + f"_Process{self.UniqueProcessId}"

# Add the constructed layer and return the name
return self._add_process_layer(
ret_layer = self._add_process_layer(
self._context, dtb, config_prefix, preferred_name
)

self._context.layers[ret_layer]._vad_root = self.get_vad_root()

return ret_layer

def get_peb(self) -> interfaces.objects.ObjectInterface:
"""Constructs a PEB object"""
Expand Down