diff --git a/volatility3/framework/layers/intel.py b/volatility3/framework/layers/intel.py index 723e4143bc..075efdb762 100644 --- a/volatility3/framework/layers/intel.py +++ b/volatility3/framework/layers/intel.py @@ -41,6 +41,7 @@ def __init__(self, self._base_layer = self.config["memory_layer"] self._swap_layers: List[str] = [] self._page_map_offset = self.config["page_map_offset"] + self._kernel: Optional[interfaces.context.ModuleInterface] = None # Assign constants self._initial_position = min(self._maxvirtaddr, self._bits_per_register) - 1 @@ -50,6 +51,7 @@ def __init__(self, # These can vary depending on the type of space self._index_shift = int(math.ceil(math.log2(struct.calcsize(self._entry_format)))) + self._structure_position_table: Dict[int, Tuple[str, int, bool]] = {} @classproperty @functools.lru_cache() @@ -102,70 +104,83 @@ def _translate(self, offset: int) -> Tuple[int, int, str]: translated address lives in and the layer_name that the address lives in """ - entry, position = self._translate_entry(offset) + # Setup the entry and how far we are through the offset + # Position maintains the number of bits left to process + # We or with 0x1 to ensure our page_map_offset is always valid + position = self._initial_position + entry = self._initial_entry + + if self.minimum_address > offset > self.maximum_address: + raise exceptions.PagedInvalidAddressException(self.name, offset, position + 1, entry, + "Entry outside virtual address range: " + hex(entry)) + + entry, position = self._translate_entry(offset, position, entry) # Now we're done if not self._page_is_valid(entry): - raise exceptions.PagedInvalidAddressException(self.name, offset, position + 1, entry, - f"Page Fault at entry {hex(entry)} in page entry") + return self._handle_page_fault(self.name, offset, position + 1, entry, f"Page Fault at entry {hex(entry)} in page entry") + page = self._mask(entry, self._maxphyaddr - 1, position + 1) | self._mask(offset, position, 0) return page, 1 << (position + 1), self._base_layer - def _translate_entry(self, offset: int) -> Tuple[int, int]: + def _find_structure_index(self, position: int) -> Tuple[str, int, bool]: + if not self._structure_position_table: + counter = self._initial_position + for name, size, large_page in self._structure: + self._structure_position_table[counter] = name, size, large_page + counter -= size + return self._structure_position_table[position] + + def _handle_page_fault(self, name, offset, invalid_bits, entry, description): + """Handles page faults""" + raise exceptions.PagedInvalidAddressException(self.name, offset, invalid_bits, entry, + "Page Fault at entry " + hex(entry) + " in table " + name + "; " + description) + + def _translate_entry(self, offset: int, position: int, entry: int) -> Tuple[int, int]: """Translates a specific offset based on paging tables. Returns the translated entry value """ - # Setup the entry and how far we are through the offset - # Position maintains the number of bits left to process - # We or with 0x1 to ensure our page_map_offset is always valid - position = self._initial_position - entry = self._initial_entry - - if self.minimum_address > offset > self.maximum_address: - raise exceptions.PagedInvalidAddressException(self.name, offset, position + 1, entry, - "Entry outside virtual address range: " + hex(entry)) - - # Run through the offset in various chunks - for (name, size, large_page) in self._structure: - # Check we're valid - if not self._page_is_valid(entry): - raise exceptions.PagedInvalidAddressException(self.name, offset, position + 1, entry, - "Page Fault at entry " + hex(entry) + " in table " + name) - # Check if we're a large page - if large_page and (entry & (1 << 7)): - # Mask off the PAT bit - if entry & (1 << 12): - entry -= (1 << 12) - # We're a large page, the rest is finished below - # If we want to implement PSE-36, it would need to be done here - break - # Figure out how much of the offset we should be using - start = position - position -= size - index = self._mask(offset, start, position + 1) >> (position + 1) - - # Grab the base address of the table we'll be getting the next entry from - base_address = self._mask(entry, self._maxphyaddr - 1, size + self._index_shift) - - table = self._get_valid_table(base_address) - if table is None: - raise exceptions.PagedInvalidAddressException(self.name, offset, position + 1, entry, - "Page Fault at entry " + hex(entry) + " in table " + name) - - # Read the data for the next entry - entry_data = table[(index << self._index_shift):(index << self._index_shift) + self._entry_size] - - if INTEL_TRANSLATION_DEBUGGING: - vollog.log( - constants.LOGLEVEL_VVVV, "Entry {} at index {} gives data {} as {}".format( - hex(entry), hex(index), hex(struct.unpack(self._entry_format, entry_data)[0]), name)) - - # Read out the new entry from memory - entry, = struct.unpack(self._entry_format, entry_data) - - return entry, position + name, size, large_page = self._find_structure_index(position) + + # Check we're present + if not entry & 0x1: + return self._handle_page_fault(self.name, offset, position + 1, entry, + "Page Fault at entry " + hex(entry) + " in table " + name) + # Check if we're a large page + if large_page and (entry & (1 << 7)): + # We're a large page, the rest is finished below + # If we want to implement PSE-36, it would need to be done here + return entry, position + # Figure out how much of the offset we should be using + start = position + position -= size + index = self._mask(offset, start, position + 1) >> (position + 1) + + # Grab the base address of the table we'll be getting the next entry from + base_address = self._mask(entry, self._maxphyaddr - 1, size + self._index_shift) + + table = self._get_valid_table(base_address) + if table is None: + raise self._handle_page_fault(self.name, offset, position + 1, entry, + "Page Fault at entry " + hex(entry) + " in table " + name) + + # Read the data for the next entry + entry_data = table[(index << self._index_shift):(index << self._index_shift) + self._entry_size] + + if INTEL_TRANSLATION_DEBUGGING: + vollog.log( + constants.LOGLEVEL_VVVV, "Entry {} at index {} gives data {} as {}".format( + hex(entry), hex(index), hex(struct.unpack(self._entry_format, entry_data)[0]), name)) + + # Read out the new entry from memory + entry, = struct.unpack(self._entry_format, entry_data) + + if position < self._page_size_in_bits: + return entry, position + + return self._translate_entry(offset, position, entry) @functools.lru_cache(1025) def _get_valid_table(self, base_address: int) -> Optional[bytes]: @@ -307,6 +322,47 @@ class Intel32e(Intel): class WindowsMixin(Intel): + _swap_bit_offset = 32 + def __init__(self, + context: interfaces.context.ContextInterface, + config_path: str, + name: str, + metadata: Optional[Dict[str, Any]] = None) -> None: + super().__init__(context = context, config_path = config_path, name = name, metadata = metadata) + self._kernel: Optional[interfaces.context.ModuleInterface] = self._get_kernel_module() + + def _get_kernel_module(self) -> Optional[interfaces.context.ModuleInterface]: + kvo = self.config.get('kernel_virtual_offset', None) + if kvo is None: + return None + + for module_name in self.context.modules: + if self.context.modules[module_name].offset == kvo: + return self.context.modules[module_name] + + return None + + @functools.lru_cache() + def _get_invalid_pte_mask(self, kernel): + if kernel.has_symbol("MiInvalidPteMask"): + pte_size = kernel.get_type("_MMPTE_HARDWARE").vol.size + pte_type = "unsigned int" + if pte_size == 8: + pte_type = "unsigned long long" + + return kernel.object(pte_type, offset=kernel.get_symbol("MiInvalidPteMask").address) + + if kernel.has_symbol("MiState"): + system_information = kernel.object("_MI_SYSTEM_INFORMATION", offset=kernel.get_symbol("MiState").address) + return system_information.Hardware.InvalidPteMask + + return 0 + + @functools.lru_cache() + def _get_PageFileLow_shift(self, kernel): + mmpte_software_type = kernel.get_type("_MMPTE_SOFTWARE") + + return mmpte_software_type.child_template("PageFileLow").start_bit @staticmethod def _page_is_valid(entry: int) -> bool: @@ -321,45 +377,43 @@ def _page_is_valid(entry: int) -> bool: """ return bool((entry & 1) or ((entry & 1 << 11) and not entry & 1 << 10)) - def _translate_swap(self, layer: Intel, offset: int, bit_offset: int) -> Tuple[int, int, str]: - try: - return super()._translate(offset) - except exceptions.PagedInvalidAddressException as excp: - entry = excp.entry - tbit = bool(entry & (1 << 11)) - pbit = bool(entry & (1 << 10)) - unknown_bit = bool(entry & (1 << 7)) - n = (entry >> 1) & 0xF - vbit = bool(entry & 1) - if (not tbit and not pbit and not vbit and unknown_bit) and ((entry >> bit_offset) != 0): - swap_offset = entry >> bit_offset << excp.invalid_bits - - if layer.config.get('swap_layers', False): - swap_layer_name = layer.config.get( - interfaces.configuration.path_join('swap_layers', 'swap_layers' + str(n)), None) - if swap_layer_name: - return swap_offset, 1 << excp.invalid_bits, swap_layer_name - raise exceptions.SwappedInvalidAddressException(layer_name = excp.layer_name, - invalid_address = excp.invalid_address, - invalid_bits = excp.invalid_bits, - entry = excp.entry, - swap_offset = swap_offset) - raise + def _handle_page_fault(self, layer_name, offset, invalid_bits, entry, description): + kernel = self._kernel + if kernel is None: + raise exceptions.PagedInvalidAddressException(self.name, offset, invalid_bits, entry, "kernel module not found!") + + tbit = bool(entry & (1 << 11)) + pbit = bool(entry & (1 << 10)) + vbit = bool(entry & 1) + entry ^= self._get_invalid_pte_mask(kernel) + + # Handle Swap failure + if (not tbit and not pbit and not vbit) and ((entry >> self._swap_bit_offset) != 0): + swap_offset = entry >> self._swap_bit_offset << invalid_bits + n = (entry >> self._get_PageFileLow_shift(kernel)) & 0xF + + if self.config.get('swap_layers', False): + swap_layer_name = self.config.get( + interfaces.configuration.path_join('swap_layers', 'swap_layers' + str(n)), None) + if swap_layer_name: + return swap_offset, 1 << invalid_bits, swap_layer_name + raise exceptions.SwappedInvalidAddressException(layer_name = layer_name, + invalid_address = offset, + invalid_bits = invalid_bits, + entry = entry, + swap_offset = swap_offset) + raise super()._handle_page_fault(layer_name, offset, invalid_bits, entry, description) ### These must be full separate classes so that JSON configs re-create them properly class WindowsIntel(WindowsMixin, Intel): - - def _translate(self, offset): - return self._translate_swap(self, offset, self._page_size_in_bits) + pass class WindowsIntelPAE(WindowsMixin, IntelPAE): - - def _translate(self, offset: int) -> Tuple[int, int, str]: - return self._translate_swap(self, offset, self._bits_per_register) + pass class WindowsIntel32e(WindowsMixin, Intel32e): @@ -368,6 +422,3 @@ class WindowsIntel32e(WindowsMixin, Intel32e): # in the PFN field for pages in transition state. # See https://github.com/volatilityfoundation/volatility3/pull/475 _maxphyaddr = 45 - - def _translate(self, offset: int) -> Tuple[int, int, str]: - return self._translate_swap(self, offset, self._bits_per_register // 2)