From 22be8a2b4a0af3b95e5ecb186fe1b95b4dc2a7ee Mon Sep 17 00:00:00 2001 From: the_janitor Date: Fri, 22 Dec 2023 08:38:53 +0100 Subject: [PATCH] Added handling of prototype and VAD hardware / unknown PTEs on Windows x64 --- volatility3/framework/layers/intel.py | 133 +++++++++++++++++- .../symbols/windows/extensions/__init__.py | 6 +- 2 files changed, 136 insertions(+), 3 deletions(-) diff --git a/volatility3/framework/layers/intel.py b/volatility3/framework/layers/intel.py index 7d3b86a12c..cc8daa4e93 100644 --- a/volatility3/framework/layers/intel.py +++ b/volatility3/framework/layers/intel.py @@ -418,6 +418,9 @@ class Intel32e(Intel): class WindowsMixin(Intel): + + _vad_root = None + @staticmethod def _page_is_valid(entry: int) -> bool: """Returns whether a particular page is valid based on its entry. @@ -431,6 +434,24 @@ def _page_is_valid(entry: int) -> bool: """ return bool((entry & 1) or ((entry & 1 << 11) and not entry & 1 << 10)) + def _is_prototype_pte(self, entry: int) -> bool: + return bool((entry & (1 << 10)) and (not super()._page_is_valid(entry))) + + def _is_transition_pte(self, entry: int) -> bool: + return bool((entry & (1 << 11)) and (not super()._page_is_valid(entry))) + + def _process_transition_pte(self, pte: int, offset: int) -> Tuple[int, int, str]: + """Resolves a Transition PTE""" + return None + + def _process_prototype_pte(self, pte: int, offset: int) -> Tuple[int, int, str]: + """Resolves a Prototype PTE""" + return None + + def _process_vad_hardware_pte(self, pte: int, offset: int) -> Tuple[int, int, str]: + """Resolves a VAD Hardware / Unknown PTE""" + return None + def _translate_swap( self, layer: Intel, offset: int, bit_offset: int ) -> Tuple[int, int, str]: @@ -438,8 +459,8 @@ def _translate_swap( return super()._translate(offset) except exceptions.PagedInvalidAddressException as excp: entry = excp.entry - tbit = bool(entry & (1 << 11)) - pbit = bool(entry & (1 << 10)) + tbit = self._is_transition_pte(entry) + pbit = self._is_prototype_pte(entry) unknown_bit = bool(entry & (1 << 7)) n = (entry >> 1) & 0xF vbit = bool(entry & 1) @@ -464,6 +485,22 @@ def _translate_swap( entry=excp.entry, swap_offset=swap_offset, ) + if isinstance(self, WindowsIntel32e): + # Handle Prototype PTEs + try: + result = self._process_prototype_pte(entry, offset) + if result is not None: + return result + except: + raise + # Handle VAD hardware / unknown PTEs + if entry == 0: + try: + result = self._process_vad_hardware_pte(entry, offset) + if result is not None: + return result + except: + raise raise @@ -487,5 +524,97 @@ class WindowsIntel32e(WindowsMixin, Intel32e): # See https://github.com/volatilityfoundation/volatility3/pull/475 _maxphyaddr = 45 + def _process_transition_pte(self, pte: int, offset: int) -> Tuple[int, int, str]: + """Resolves a Transition PTE""" + # Check if it's a transition PTE + if not self._is_transition_pte(pte): + return None + + # _MMPTE_TRANSITION::PageFrameNumber + page = self._mask(pte, self._maxphyaddr - 1, self._page_size_in_bits) | \ + self._mask(offset, self._page_size_in_bits - 1, 0) + + return page, 1 << self._page_size_in_bits, self._base_layer + + def _process_prototype_pte(self, pte: int, offset: int) -> Tuple[int, int, str]: + """Resolves a Prototype PTE""" + # Check if it's a prototype PTE + if not self._is_prototype_pte(pte): + return None + + # _MMPTE_PROTOTYPE::ProtoAddress + proto_addr = self.canonicalize(pte >> 16) + + if proto_addr == 0xffffffffffff0000: + return self._process_vad_hardware_pte(proto_addr, offset) + + # Translate the prototype address + pte_addr_pa = super()._translate(proto_addr) + + # Read and unpack the PTE + entry = self._context.layers.read(self._base_layer, pte_addr_pa[0], + self._bits_per_register // 8) + + (entry, ) = struct.unpack(self._entry_format, entry) + + # Check if the entry is valid + if super()._page_is_valid(entry): + + # _MMPTE_HARDWARE::PageFrameNumber + page = self._mask(entry, self._maxphyaddr - 1, self._page_size_in_bits) | \ + self._mask(offset, self._page_size_in_bits - 1, 0) + + return page, 1 << self._page_size_in_bits, self._base_layer + + return None + + def _process_vad_hardware_pte(self, pte: int, offset: int) -> Tuple[int, int, str]: + """Resolves a VAD hardware / unknown PTE""" + # Check if it's a potential VAD hardware PTE + if super()._page_is_valid(pte) or \ + self._is_prototype_pte(pte) or \ + self._is_transition_pte(pte): + return None + + if self._vad_root is None: + return None + + # Walk the VAD + # TODO: Make the lookup more efficient + for vad in self._vad_root.traverse(): + if (offset < vad.get_start()) or (offset > vad.get_end()): + continue + + if (not vad.has_member("FirstPrototypePte")) or \ + (not vad.has_member("LastContiguousPte")): + continue + + pte_ofs = ((offset - vad.get_start()) >> self._page_size_in_bits) * \ + (self._bits_per_register // 8) + + if pte_ofs > (vad.LastContiguousPte - vad.FirstPrototypePte): + continue + + pte_addr = self.canonicalize(vad.FirstPrototypePte + pte_ofs) + pte_addr_pa = super()._translate(pte_addr) + + # Read and unpack the PTE + entry = self._context.layers.read(self._base_layer, pte_addr_pa[0], + self._bits_per_register // 8) + + (pte, ) = struct.unpack(self._entry_format, entry) + + # Handle Transition PTEs + if self._is_transition_pte(pte): + return self._process_transition_pte(pte, offset) + + # Handle Prototype PTEs + if self._is_prototype_pte(pte): + return self._process_prototype_pte(pte, offset) + + break + + return None + def _translate(self, offset: int) -> Tuple[int, int, str]: return self._translate_swap(self, offset, self._bits_per_register // 2) diff --git a/volatility3/framework/symbols/windows/extensions/__init__.py b/volatility3/framework/symbols/windows/extensions/__init__.py index d435851d73..e7ef8e1faf 100755 --- a/volatility3/framework/symbols/windows/extensions/__init__.py +++ b/volatility3/framework/symbols/windows/extensions/__init__.py @@ -625,9 +625,13 @@ def add_process_layer(self, config_prefix: str = None, preferred_name: str = Non preferred_name = self.vol.layer_name + f"_Process{self.UniqueProcessId}" # Add the constructed layer and return the name - return self._add_process_layer( + ret_layer = self._add_process_layer( self._context, dtb, config_prefix, preferred_name ) + + self._context.layers[ret_layer]._vad_root = self.get_vad_root() + + return ret_layer def get_peb(self) -> interfaces.objects.ObjectInterface: """Constructs a PEB object"""