Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crashdump writer plugin #718

Open
wants to merge 17 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions volatility3/framework/layers/physical.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@ def __init__(self,
config_path: str,
name: str,
buffer: bytes,
metadata: Optional[Dict[str, Any]] = None) -> None:
metadata: Optional[Dict[str, Any]] = None,
offset: int = 0) -> None:
super().__init__(context = context, config_path = config_path, name = name, metadata = metadata)
self._buffer = buffer
self._offset = offset

@property
def maximum_address(self) -> int:
"""Returns the largest available address in the space."""
return len(self._buffer) - 1
return self.minimum_address + len(self._buffer) - 1

@property
def minimum_address(self) -> int:
"""Returns the smallest available address in the space."""
return 0
return self._offset

def is_valid(self, offset: int, length: int = 1) -> bool:
"""Returns whether the offset is valid or not."""
Expand All @@ -48,11 +50,13 @@ def read(self, address: int, length: int, pad: bool = False) -> bytes:
invalid_address = self.maximum_address + 1
raise exceptions.InvalidAddressException(self.name, invalid_address,
"Offset outside of the buffer boundaries")
return self._buffer[address:address + length]
real_address = address - self.minimum_address
return self._buffer[real_address:real_address + length]

def write(self, address: int, data: bytes):
"""Writes the data from to the buffer."""
self._buffer = self._buffer[:address] + data + self._buffer[address + len(data):]
real_address = address - self.minimum_address
self._buffer = self._buffer[:real_address] + data + self._buffer[real_address + len(data):]

@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
Expand Down
100 changes: 95 additions & 5 deletions volatility3/framework/plugins/windows/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,33 @@
#

import time
import struct
from typing import List, Tuple, Iterable

from volatility3.framework import constants, interfaces, layers, symbols
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.interfaces import plugins, configuration
paulkermann marked this conversation as resolved.
Show resolved Hide resolved
from volatility3.framework.renderers import TreeGrid
from volatility3.framework.symbols import intermed
from volatility3.framework.symbols.windows import extensions

Check notice

Code scanning / CodeQL

Unused import Note

Import of 'extensions' is not used.
from volatility3.framework.layers import physical

def rol(value: int, count: int, max_bits: int = 64) -> int:
"""A rotate-left instruction in Python"""
max_bits_mask = (1 << max_bits) - 1
return (value << count % max_bits) & max_bits_mask | \
((value & max_bits_mask) >> (max_bits - (count % max_bits)))

def bswap_32(value: int) -> int:
value = ((value << 8) & 0xFF00FF00) | ((value >> 8) & 0x00FF00FF)

return ((value << 16) | (value >> 16)) & 0xffffffff

def bswap_64(value: int) -> int:
low = bswap_32((value >> 32))
high = bswap_32((value & 0xFFFFFFFF))

return ((high << 32) | low) & 0xffffffffffffffff

class Info(plugins.PluginInterface):
"""Show OS & kernel details of the memory sample being analyzed."""
Expand Down Expand Up @@ -64,18 +82,17 @@ def get_kernel_module(cls, context: interfaces.context.ContextInterface, layer_n
return ntkrnlmp

@classmethod
def get_kdbg_structure(cls, context: interfaces.context.ContextInterface, config_path: str, layer_name: str,
symbol_table: str) -> interfaces.objects.ObjectInterface:
def get_raw_kdbg_structure(cls, context: interfaces.context.ContextInterface, config_path: str, layer_name: str,
symbol_table: str) -> interfaces.objects.ObjectInterface:
"""Returns the KDDEBUGGER_DATA64 structure for a kernel"""
ntkrnlmp = cls.get_kernel_module(context, layer_name, symbol_table)

native_types = context.symbol_space[symbol_table].natives

kdbg_offset = ntkrnlmp.get_symbol("KdDebuggerDataBlock").address

kdbg_table_name = intermed.IntermediateSymbolTable.create(context,
interfaces.configuration.path_join(
config_path, 'kdbg'),
config_path, 'kdbg'),
"windows",
"kdbg",
native_types = native_types,
Expand All @@ -87,6 +104,78 @@ def get_kdbg_structure(cls, context: interfaces.context.ContextInterface, config

return kdbg

@classmethod
def is_kdbg_encoded(cls, context: interfaces.context.ContextInterface, layer_name: str, symbol_table: str) -> bool:
ntkrnlmp = cls.get_kernel_module(context, layer_name, symbol_table)
if not ntkrnlmp.has_symbol("KdpDataBlockEncoded"):
return False

KdpDataBlockEncoded_value = ntkrnlmp.object("char", offset=ntkrnlmp.get_symbol("KdpDataBlockEncoded").address)

return KdpDataBlockEncoded_value != 0

@classmethod
def _decode_encoded_kdbg_bytes(cls, context: interfaces.context.ContextInterface , offset: int,
size: int, layer_name: str, symbol_table: str) -> bytes:

kernel = cls.get_kernel_module(context, layer_name, symbol_table)
wait_never = kernel.object("unsigned long long", offset=kernel.get_symbol("KiWaitNever").address)
wait_always = kernel.object("unsigned long long", offset=kernel.get_symbol("KiWaitAlways").address)
datablockencoded = kernel.object("char", offset=kernel.get_symbol("KdpDataBlockEncoded").address)

decoded_buffer = b""
encoded_array = kernel.object(object_type="array", subtype=kernel.get_type("unsigned long long"), offset=offset, layer_name=layer_name, count=(size // 8), absolute=True)
for entry in encoded_array:
low_byte = (wait_never) & 0xFF
entry = rol(entry ^ wait_never, low_byte)
# TODO: remove the OR after #702 gets merged in.
swap_xor = datablockencoded.vol.offset | 0xFFFF000000000000
entry = bswap_64(entry ^ swap_xor)
decoded_buffer += struct.pack("Q", entry ^ wait_always)

return decoded_buffer

@classmethod
def _create_structure_from_bytes(cls, context: interfaces.context.ContextInterface,
structure_bytes: bytes, structure_symbol_table: str, structure_name: str,
offset: int, native_layer_name: str):

new_layer_name = f"{structure_name}_{offset}"
new_layer = physical.BufferDataLayer(context,
configuration.path_join(new_layer_name, 'layer'),
name = new_layer_name,
buffer = structure_bytes, offset=offset)
context.layers.add_layer(new_layer)

return context.object(f"{structure_symbol_table}{constants.BANG}{structure_name}", layer_name=new_layer.name,
offset=offset, native_layer_name=native_layer_name)


@classmethod
def get_kdbg_structure(cls, context: interfaces.context.ContextInterface, config_path: str, layer_name: str,
symbol_table: str) -> interfaces.objects.ObjectInterface:
kernel = cls.get_kernel_module(context, layer_name, symbol_table)
Fixed Show fixed Hide fixed
kdbg = cls.get_raw_kdbg_structure(context, config_path, layer_name, symbol_table)
primary = context.layers[kernel.layer_name]
Fixed Show fixed Hide fixed
tag_value = struct.pack("I", kdbg.Header.OwnerTag)
is_kdbg_encoded = cls.is_kdbg_encoded(context, layer_name, symbol_table)

if not is_kdbg_encoded and tag_value == b"KDBG":
return kdbg

kdbg_symbol_table_name = kdbg.get_symbol_table_name()
kdbg_symbol_table = context.symbol_space[kdbg_symbol_table_name]
header_size = kdbg_symbol_table.get_type("_DBGKD_DEBUG_DATA_HEADER64").size
decoded_header_bytes = cls._decode_encoded_kdbg_bytes(context, kdbg.vol.offset, header_size, layer_name, symbol_table)
decoded_header = cls._create_structure_from_bytes(context, decoded_header_bytes, kdbg_symbol_table_name, "_DBGKD_DEBUG_DATA_HEADER64", kdbg.vol.offset, layer_name)

kdbg_size = decoded_header.Size
decoded_kdbg_bytes = cls._decode_encoded_kdbg_bytes(context, kdbg.vol.offset, kdbg_size, layer_name, symbol_table)

decoded_kdbg = cls._create_structure_from_bytes(context, decoded_kdbg_bytes, kdbg_symbol_table_name, "_KDDEBUGGER_DATA64", kdbg.vol.offset, layer_name)
return decoded_kdbg


@classmethod
def get_kuser_structure(cls, context: interfaces.context.ContextInterface, layer_name: str,
symbol_table: str) -> interfaces.objects.ObjectInterface:
Expand Down Expand Up @@ -166,6 +255,7 @@ def _generator(self):
for i, layer in self.get_depends(self.context, layer_name):
yield (0, (layer.name, f"{i} {layer.__class__.__name__}"))

yield (0, ("IsKDBGEncoded", str(bool(self.is_kdbg_encoded(self.context, layer_name, symbol_table)))))
if kdbg.Header.OwnerTag == 0x4742444B:

yield (0, ("KdDebuggerDataBlock", hex(kdbg.vol.offset)))
Expand Down
143 changes: 143 additions & 0 deletions volatility3/framework/plugins/windows/writecrashdump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from typing import List, Type, Iterator, Tuple
import random, string

from volatility3.framework import interfaces, renderers, constants, exceptions
Fixed Show fixed Hide fixed
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import configuration, plugins
from volatility3.framework.layers import physical, intel
from volatility3.framework.symbols import intermed
from volatility3.plugins.windows import info

class WriteCrashDump(plugins.PluginInterface):
"""Runs the automagics and writes the output to a crashdump format file"""

_required_framework_version = (2, 0, 0)
_version = (2, 0, 0)

@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.ModuleRequirement(name = 'kernel', description = 'Windows kernel',
architectures = ["Intel32", "Intel64"]),
requirements.VersionRequirement("info", component = info.Info, version = (1, 0, 0))
]

@classmethod
def get_physical_layer_name(cls, context: interfaces.context.ContextInterface, vlayer: interfaces.layers.DataLayerInterface) -> str:
return context.config.get(interfaces.configuration.path_join(vlayer.config_path, 'memory_layer'), None)

@classmethod
def write_crashdump(cls, kernel: interfaces.context.ModuleInterface, open_method: Type[interfaces.plugins.FileHandlerInterface],
progress_callback: constants.ProgressCallback = None) -> str:
layer_name = kernel.layer_name
context = kernel.context
symbol_table = kernel.symbol_table_name
primary = context.layers[layer_name]
is_pae = isinstance(primary, intel.IntelPAE)
is_64_bit = isinstance(primary, intel.Intel32e)

if is_64_bit:
crashdump_json = 'crash64'
dump_header_name = '_DUMP_HEADER64'
valid_dump_suffix = [ord('6'), ord('4')]
else:
crashdump_json = 'crash'
dump_header_name = '_DUMP_HEADER'
valid_dump_suffix = [ord('M'), ord('P')]

config_path = ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(8))

crash_table_name = intermed.IntermediateSymbolTable.create(context,
configuration.path_join(config_path, 'symbols'),
'',
crashdump_json)

dump_header_type = context.symbol_space[crash_table_name].get_type(dump_header_name)
header_layer_name = context.layers.free_layer_name('header_layer')
header_layer = physical.BufferDataLayer(context,
configuration.path_join(config_path, 'layer'),
name = header_layer_name,
buffer = b"PAGE" * (dump_header_type.size // 4))
context.layers.add_layer(header_layer)
dump_header = context.object(dump_header_type, header_layer.name, 0)

info_config_path = configuration.path_join(config_path, 'info')
kdbg = info.Info.get_kdbg_structure(context, info_config_path, layer_name, symbol_table)

kuser = info.Info.get_kuser_structure(context, layer_name, symbol_table)
version_structure = info.Info.get_version_structure(context, layer_name, symbol_table)

dump_header.ValidDump.write([ord('D'), ord('U')] + valid_dump_suffix)

# TODO: remove the OR after #702 gets merged in.
dump_header.KdDebuggerDataBlock.write(kdbg.vol.offset | (0 if not is_64_bit else 0xFFFF000000000000))
dump_header.MajorVersion.write(version_structure.MajorVersion)
dump_header.MinorVersion.write(version_structure.MinorVersion)
dump_header.MachineImageType.write(version_structure.MachineType)

number_processors = kernel.object("unsigned int", offset=kernel.get_symbol("KeNumberProcessors").address)
dump_header.NumberProcessors.write(number_processors)

dump_header.DirectoryTableBase.write(primary.config['page_map_offset'])
if dump_header.has_member("PaeEnabled"):
dump_header.PaeEnabled.write(int(is_pae))

dump_header.PfnDataBase.write(kdbg.MmPfnDatabase)
dump_header.PsLoadedModuleList.write(kdbg.PsLoadedModuleList)
dump_header.PsActiveProcessHead.write(kdbg.PsActiveProcessHead)
dump_header.DumpType.write(1) # DUMP_TYPE_FULL - Run-based dump.

dump_header.SystemTime.write(kuser.SystemTime.cast('unsigned long long'))

dump_header.BugCheckCode.write(0)
dump_header.BugCheckCodeParameter.write([0, 0, 0, 0])

blank_len = dump_header.Exception.vol.offset - dump_header.ContextRecord.vol.offset
header_layer.write(dump_header.ContextRecord.vol.offset, b"\x00" * blank_len)

dump_header.Comment.write(
bytes("Volatility 3 {} generated crashdump file\x00".format(constants.PACKAGE_VERSION),
'latin-1'))

# Write the actual data
virtual_layer = context.layers[layer_name]
physical_layer_name = cls.get_physical_layer_name(context, virtual_layer)
physical_layer = context.layers[physical_layer_name]

page_count = (physical_layer.maximum_address - physical_layer.minimum_address) // 0x1000
dump_header.PhysicalMemoryBlockBuffer.NumberOfRuns.write(1)
dump_header.PhysicalMemoryBlockBuffer.NumberOfPages.write(page_count)
run0 = dump_header.PhysicalMemoryBlockBuffer.Run[0]
run0.BasePage.write(physical_layer.minimum_address)
run0.PageCount.write(page_count)

dump_header.RequiredDumpSpace.write((page_count + 2) * 0x1000)

filename = "crash.dmp"
# We don't try any form of compression, but just write the data as one large run
with open_method(filename) as f:
filename = f.preferred_filename
# We want to include the maxmium address
header_data = header_layer.read(0, header_layer.maximum_address + 1)
f.write(header_data)
for offset in range(physical_layer.minimum_address, physical_layer.maximum_address + 1, 0x1000):
paulkermann marked this conversation as resolved.
Show resolved Hide resolved
paulkermann marked this conversation as resolved.
Show resolved Hide resolved
if offset & 0xffffff == 0:
if progress_callback:
progress_callback((offset * 100) / (physical_layer.maximum_address + 1), "Reading memory")
f.write(physical_layer.read(offset, 0x1000, pad = True))

# Fix KDBG in the dump if it was encoded
decoded_data = context.layers[kdbg.vol.layer_name].read(kdbg.vol.offset, kdbg.Header.Size)
kdbg_physical_address = primary.translate(kdbg.vol.offset)[0]
kdbg_file_location = (header_layer.maximum_address + 1) + kdbg_physical_address - physical_layer.minimum_address
f.seek(kdbg_file_location)
f.write(decoded_data)

return filename

def _generator(self) -> Iterator[Tuple]:
filename = self.write_crashdump(self.context.modules[self.config['kernel']], self._file_handler, self._progress_callback)
yield 0, ('Done', filename)

def run(self) -> renderers.TreeGrid:
return renderers.TreeGrid([("Status", str), ("Output File name", str)], self._generator())
6 changes: 4 additions & 2 deletions volatility3/framework/symbols/windows/extensions/kdbg.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
from volatility3.framework import constants
from volatility3.framework import objects


class KDDEBUGGER_DATA64(objects.StructType):

def get_build_lab(self):
"""Returns the NT build lab string from the KDBG."""

layer_name = self.vol.layer_name
if self.vol.native_layer_name:
paulkermann marked this conversation as resolved.
Show resolved Hide resolved
layer_name = self.vol.native_layer_name
symbol_table_name = self.get_symbol_table_name()

return self._context.object(symbol_table_name + constants.BANG + "string",
Expand All @@ -24,6 +25,8 @@ def get_csdversion(self):
"""Returns the CSDVersion as an integer (i.e. Service Pack number)"""

layer_name = self.vol.layer_name
if self.vol.native_layer_name:
layer_name = self.vol.native_layer_name
symbol_table_name = self.get_symbol_table_name()

csdresult = self._context.object(symbol_table_name + constants.BANG + "unsigned long",
Expand All @@ -32,5 +35,4 @@ def get_csdversion(self):

return (csdresult >> 8) & 0xffffffff


class_types = {'_KDDEBUGGER_DATA64': KDDEBUGGER_DATA64}