Skip to content

Commit

Permalink
Merge pull request #1057 from eve-mem/linux_fix_pstree
Browse files Browse the repository at this point in the history
Linux: Update pslist to fix pstree
  • Loading branch information
ikelos authored Dec 14, 2023
2 parents 713b5f9 + e2f7e7e commit 4ac261c
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 61 deletions.
116 changes: 74 additions & 42 deletions volatility3/framework/plugins/linux/pslist.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file is Copyright 2021 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
from typing import Any, Callable, Iterable, List
from typing import Any, Callable, Iterable, List, Tuple

from volatility3.framework import interfaces, renderers
from volatility3.framework.configuration import requirements
Expand All @@ -17,7 +17,7 @@ class PsList(interfaces.plugins.PluginInterface):

_required_framework_version = (2, 0, 0)

_version = (2, 1, 0)
_version = (2, 2, 0)

@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
Expand Down Expand Up @@ -78,6 +78,72 @@ def filter_func(x):
else:
return lambda _: False

@classmethod
def get_task_fields(
cls, task: interfaces.objects.ObjectInterface, decorate_comm: bool = False
) -> Tuple[int, int, int, str]:
"""Extract the fields needed for the final output
Args:
task: A task object from where to get the fields.
decorate_comm: If True, it decorates the comm string of
- User threads: in curly brackets,
- Kernel threads: in square brackets
Defaults to False.
Returns:
A tuple with the fields to show in the plugin output.
"""
pid = task.tgid
tid = task.pid
ppid = task.parent.tgid if task.parent else 0
name = utility.array_to_string(task.comm)
if decorate_comm:
if task.is_kernel_thread:
name = f"[{name}]"
elif task.is_user_thread:
name = f"{{{name}}}"

task_fields = (task.vol.offset, pid, tid, ppid, name)
return task_fields

def _get_file_output(self, task: interfaces.objects.ObjectInterface) -> str:
"""Extract the elf for the process if requested
Args:
task: A task object to extract from.
Returns:
A string showing the results of the extraction, either
the filename used or an error.
"""
elf_table_name = intermed.IntermediateSymbolTable.create(
self.context,
self.config_path,
"linux",
"elf",
class_types=elf.class_types,
)
proc_layer_name = task.add_process_layer()
if not proc_layer_name:
# if we can't build a proc layer we can't
# extract the elf
return renderers.NotApplicableValue()
else:
# Find the vma that belongs to the main ELF of the process
file_output = "Error outputting file"
for v in task.mm.get_mmap_iter():
if v.vm_start == task.mm.start_code:
file_handle = elfs.Elfs.elf_dump(
self.context,
proc_layer_name,
elf_table_name,
v,
task,
self.open,
)
if file_handle:
file_output = str(file_handle.preferred_filename)
file_handle.close()
break
return file_output

def _generator(
self,
pid_filter: Callable[[Any], bool],
Expand All @@ -104,49 +170,15 @@ def _generator(
for task in self.list_tasks(
self.context, self.config["kernel"], pid_filter, include_threads
):
elf_table_name = intermed.IntermediateSymbolTable.create(
self.context,
self.config_path,
"linux",
"elf",
class_types=elf.class_types,
)
file_output = "Disabled"
if dump:
proc_layer_name = task.add_process_layer()
if not proc_layer_name:
continue

# Find the vma that belongs to the main ELF of the process
file_output = "Error outputting file"

for v in task.mm.get_mmap_iter():
if v.vm_start == task.mm.start_code:
file_handle = elfs.Elfs.elf_dump(
self.context,
proc_layer_name,
elf_table_name,
v,
task,
self.open,
)
if file_handle:
file_output = str(file_handle.preferred_filename)
file_handle.close()
break

pid = task.tgid
tid = task.pid
ppid = task.parent.tgid if task.parent else 0
name = utility.array_to_string(task.comm)
if decorate_comm:
if task.is_kernel_thread:
name = f"[{name}]"
elif task.is_user_thread:
name = f"{{{name}}}"
file_output = self._get_file_output(task)
else:
file_output = "Disabled"

offset, pid, tid, ppid, name = self.get_task_fields(task, decorate_comm)

yield 0, (
format_hints.Hex(task.vol.offset),
format_hints.Hex(offset),
pid,
tid,
ppid,
Expand Down
94 changes: 75 additions & 19 deletions volatility3/framework/plugins/linux/pstree.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,49 @@
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

from volatility3.framework import interfaces, renderers
from volatility3.framework.configuration import requirements
from volatility3.framework.renderers import format_hints
from volatility3.plugins.linux import pslist


class PsTree(pslist.PsList):
class PsTree(interfaces.plugins.PluginInterface):
"""Plugin for listing processes in a tree based on their parent process
ID."""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._tasks = {}
self._levels = {}
self._children = {}
_required_framework_version = (2, 0, 0)

@classmethod
def get_requirements(cls):
# Since we're calling the plugin, make sure we have the plugin's requirements
return [
requirements.ModuleRequirement(
name="kernel",
description="Linux kernel",
architectures=["Intel32", "Intel64"],
),
requirements.PluginRequirement(
name="pslist", plugin=pslist.PsList, version=(2, 2, 0)
),
requirements.ListRequirement(
name="pid",
description="Filter on specific process IDs",
element_type=int,
optional=True,
),
requirements.BooleanRequirement(
name="threads",
description="Include user threads",
optional=True,
default=False,
),
requirements.BooleanRequirement(
name="decorate_comm",
description="Show `user threads` comm in curly brackets, and `kernel threads` comm in square brackets",
optional=True,
default=False,
),
]

def find_level(self, pid: int) -> None:
"""Finds how deep the PID is in the tasks hierarchy.
Expand All @@ -39,29 +70,27 @@ def find_level(self, pid: int) -> None:
self._levels[pid] = level

def _generator(
self, pid_filter, include_threads: bool = False, decorate_com: bool = False
self,
tasks: list,
decorate_comm: bool = False,
):
"""Generates the tasks hierarchy tree.
Args:
pid_filter: A function which takes a process object and returns True if the process should be ignored/filtered
include_threads: If True, the output will also show the user threads
If False, only the thread group leaders will be shown
Defaults to False.
tasks: A list of task objects to be displayed
decorate_comm: If True, it decorates the comm string of
- User threads: in curly brackets,
- Kernel threads: in square brackets
Defaults to False.
Yields:
Each rows
"""
vmlinux = self.context.modules[self.config["kernel"]]
for proc in self.list_tasks(
self.context,
vmlinux.name,
filter_func=pid_filter,
include_threads=include_threads,
):

self._tasks = {}
self._levels = {}
self._children = {}

for proc in tasks:
self._tasks[proc.pid] = proc

# Build the child/level maps
Expand All @@ -71,7 +100,10 @@ def _generator(
def yield_processes(pid):
task = self._tasks[pid]

row = self._get_task_fields(task, decorate_com)
row = pslist.PsList.get_task_fields(task, decorate_comm)
# update the first element, the offset, in the row tuple to use format_hints.Hex
# as a simple int is returned from get_task_fields.
row = (format_hints.Hex(row[0]),) + row[1:]

tid = task.pid
yield (self._levels[tid] - 1, row)
Expand All @@ -82,3 +114,27 @@ def yield_processes(pid):
for pid, level in self._levels.items():
if level == 1:
yield from yield_processes(pid)

def run(self):
filter_func = pslist.PsList.create_pid_filter(self.config.get("pid", None))
include_threads = self.config.get("threads")
decorate_comm = self.config.get("decorate_comm")

return renderers.TreeGrid(
[
("OFFSET (V)", format_hints.Hex),
("PID", int),
("TID", int),
("PPID", int),
("COMM", str),
],
self._generator(
pslist.PsList.list_tasks(
self.context,
self.config["kernel"],
filter_func=filter_func,
include_threads=include_threads,
),
decorate_comm=decorate_comm,
),
)

0 comments on commit 4ac261c

Please sign in to comment.