Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add linux sockscan plugin #1120

Draft
wants to merge 12 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion test/test_volatility.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def test_windows_thrdscan(image, volatility, python):
assert out.find(b"\t4\t8") != -1
assert out.find(b"\t4\t12") != -1
assert out.find(b"\t4\t16") != -1
#assert out.find(b"this raieses AssertionError") != -1
# assert out.find(b"this raieses AssertionError") != -1
assert rc == 0


Expand Down Expand Up @@ -368,6 +368,36 @@ def test_linux_library_list(image, volatility, python):
assert rc == 0


def test_linux_sockscan(image, volatility, python):
# designed for linux-sample-1.dmp SHA1:1C3A4627EDCA94A7ADE3414592BEF0E62D7D3BB6
rc, out, err = runvol_plugin("linux.sockscan.Sockscan", image, volatility, python)

# ensure that multiple unix paths for sockets have been found
assert (
len(
re.findall(
rb"(/[ -~]+?){1,8}",
out,
)
)
>= 10
)

# ensure that multiple IPv4 addresses have been found
assert (
len(
re.findall(
rb"((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)\.?\b){4}",
out,
)
)
>= 10
)

assert out.count(b"\n") >= 50
assert rc == 0


# MAC


Expand Down
353 changes: 353 additions & 0 deletions volatility3/framework/plugins/linux/sockscan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
# This file is Copyright 2024 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import logging
import struct
from typing import List, Set

from volatility3.framework import exceptions, constants
from volatility3.framework.renderers import TreeGrid, NotAvailableValue, format_hints
from volatility3.framework.configuration import requirements
from volatility3.framework.interfaces import plugins
from volatility3.framework.symbols import linux
from volatility3.plugins.linux import sockstat
from volatility3.framework import symbols, constants
from volatility3.framework.layers import scanners

vollog = logging.getLogger(__name__)


class Sockscan(plugins.PluginInterface):
"""Scans for network connections found in memory layer."""

_required_framework_version = (2, 6, 0)

gcmoreira marked this conversation as resolved.
Show resolved Hide resolved
@classmethod
def get_requirements(cls):
return [
requirements.ModuleRequirement(
name="kernel",
description="Linux kernel",
architectures=["Intel32", "Intel64"],
),
requirements.VersionRequirement(
name="SockHandlers", component=sockstat.SockHandlers, version=(1, 0, 0)
),
requirements.VersionRequirement(
name="linuxutils", component=linux.LinuxUtilities, version=(2, 0, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LinuxUtilities is currently on 2.1.0, check this is still working

),
]

def _canonicalize_symbol_addrs(
self, symbol_table_name: List[str], symbol_names: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self, symbol_table_name: str, symbol_names: List[str]

) -> Set[bytes]:
"""Takes a list of symbol names and converts the address of each to the bytes
as they would appear in memory so that they can be scanned for.

Symbols that cannot be found are ignored and not included in the results.

Args:
symbol_table_name: The name of the kernel module on which to operate
symbol_names: A list of symbol names to be looked up

Returns:
A set of bytes which are the packed addresses.
"""
# get vmlinux module from context in order to build objects and read symbols
vmlinux = self.context.modules[symbol_table_name]

# get kernel layer from context so that it's dependencies can be found, and therefore scanned.
# kernel layer will be virtual and built ontop of a physical layer.
kernel_layer = self.context.layers[vmlinux.layer_name]

# detmine if kernel is 64bit or not. The plugin scans for pointers and these need to formated
# to the correct size so that they can be accurately located in the physical layer.
if symbols.symbol_table_is_64bit(self.context, vmlinux.symbol_table_name):
pack_format = "Q" # 64 bit
else:
pack_format = "I" # 32 bit

packed_needles = set()
for symbol_name in symbol_names:
try:
needle_addr = vmlinux.object_from_symbol(symbol_name).vol.offset
except exceptions.SymbolError:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Unable to find symbol {symbol_name} this will not be scanned for.",
)
continue
# use canonicalize to set the appropriate sign extension for the addr
addr = kernel_layer.canonicalize(needle_addr)
packed_addr = struct.pack(pack_format, addr)
packed_needles.add(packed_addr)
vollog.log(
constants.LOGLEVEL_VVVV,
f"Will scan for {symbol_name} using the bytes: {packed_addr.hex()}",
)

# make a warning if no symbols at all could be resolved.
if len(packed_needles) == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not packed_needles:

vollog.warning(
f"_canonicalize_symbol_addrs was unable to resolve any symbols, use -vvvv for more information."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not need a f'string here

)

return packed_needles

def _generator(self, symbol_table_name: str):
Copy link
Contributor

@gcmoreira gcmoreira Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is a bit too long, any chance to split it in smaller functions? That will also help to document what each subfunction is doing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes will do 👍

"""Scans for sockets. Each row represents a kernel socket.

Args:
symbol_table_name: The name of the kernel module on which to operate

Yields:
addr: Physical offset
family: Socket family string (AF_UNIX, AF_INET, etc)
sock_type: Socket type string (STREAM, DGRAM, etc)
protocol: Protocol string (UDP, TCP, etc)
source addr: Source address string
source port: Source port string (not all of them are int)
destination addr: Destination address string
destination port: Destination port (not all of them are int)
state: State strings (LISTEN, CONNECTED, etc)
"""

# get vmlinux module from context in order to build objects and read symbols
vmlinux = self.context.modules[symbol_table_name]

# get kernel layer from context so that it's dependencies can be found, and therefore scanned.
# kernel layer will be virtual and built ontop of a physical layer.
kernel_layer = self.context.layers[vmlinux.layer_name]

# TODO: Update plugin to support multiple dependencies. e.g. a memory layer and swap file.
# This is a shared problem with psscan and having a generic solution would be useful.

# Find the memory layer to scan, and provide warnings if more than one is located.
if len(kernel_layer.dependencies) > 1:
vollog.warning(
f"Kernel layer depends on multiple layers however only {kernel_layer.dependencies[0]} will be scanned by this plugin."
)
elif len(kernel_layer.dependencies) == 0:
vollog.error(
f"Kernel layer has no dependencies, meaning there is no memory layer for this plugin to scan."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no f'string needed

)
raise exceptions.LayerException(
vmlinux.layer_name, f"Layer {vmlinux.layer_name} has no dependencies"
)
memory_layer_name = kernel_layer.dependencies[0]
memory_layer = self.context.layers[kernel_layer.dependencies[0]]

# use the init process to build a sock handler
# TODO: look into options so that sockstat.SockHandlers so that process_sock can
# be used without a task object.
init_task = vmlinux.object_from_symbol(symbol_name="init_task")
sock_handler = sockstat.SockHandlers(vmlinux, init_task)

# get progress_callback in order to use this in the scanners.
# TODO: perhaps add more detail to progress, showing method in progress and number of hits found
progress_callback = self._progress_callback

# Method 1 - find sockets by file operations, then follow pointers to sockets
file_ops_symbol_names = [
"socket_file_ops",
"sockfs_dentry_operations",
]
file_ops_needles = self._canonicalize_symbol_addrs(
symbol_table_name, file_ops_symbol_names
)
# get file struct to find the offset to the f_op pointer
# this is so that the file object can be created at the correct offset,
# the results of the scanner will be for the f_op member within the file
f_op_offset = vmlinux.get_type("file").members["f_op"][0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vmlinux.get_type("file").relative_child_offset("f_op")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relative_child_offset is nice - thank you!


# Method 2 - find sockets by socket destructor directly inside sock objects
socket_destructor_symbol_names = [
"sock_def_destruct",
"packet_sock_destruct",
"unix_sock_destructor",
"netlink_sock_destruct",
"inet_sock_destruct",
]
socket_destructor_needles = self._canonicalize_symbol_addrs(
symbol_table_name, socket_destructor_symbol_names
)
# get sock struct to find the offset to the sk_destruct pointer
# this is so that the sock object can be created at the correct offset,
# the results of the scanner will be for the sk_destruct member within the scock
sk_destruct_offset = vmlinux.get_type("sock").members["sk_destruct"][0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vmlinux.get_type("sock").relative_child_offset("sk_destruct")


# TODO Method 3 - find sock by sk_error_report symbols
# sk_error_report_symbol_names = ['sock_def_error_report', 'inet_sk_rebuild_header', 'inet_listen']
# this would be similar to Method 2, but using a different pointer within sock.

# add a set of seen addresses to stop possible duplication of results.
seen_sock_physical_addr = set()

# Using the calculated needles, scan the memory layer and attempt to parse the sockets located.
for needle_addr, match in memory_layer.scan(
self.context,
scanners.MultiStringScanner(socket_destructor_needles | file_ops_needles),
progress_callback,
):
psock = None
sock_physical_addr = None

# if match is from socket_destructor_needles simply calculate the offset
# to the sock
if match in socket_destructor_needles:
sock_physical_addr = needle_addr - sk_destruct_offset
psock = self.context.object(
vmlinux.symbol_table_name + constants.BANG + "sock",
offset=sock_physical_addr,
layer_name=memory_layer_name,
native_layer_name=vmlinux.layer_name,
)

# if match is from file_ops_needles attempt to walk from file object to
# the sock
if match in file_ops_needles:
try:
# create file in the memory_layer, the native layer matches the
# kernel so that pointers can be followed
sock_physical_addr = needle_addr - f_op_offset
pfile = self.context.object(
vmlinux.symbol_table_name + constants.BANG + "file",
offset=sock_physical_addr,
layer_name=memory_layer_name,
native_layer_name=vmlinux.layer_name,
)
dentry = pfile.get_dentry()
if not dentry:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Skipping file at {hex(needle_addr)} as unable to locate dentry",
)
continue

d_inode = dentry.d_inode
if not d_inode:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Skipping file at {hex(needle_addr)} as unable to locate inode for dentry",
)
continue

socket_alloc = linux.LinuxUtilities.container_of(
d_inode, "socket_alloc", "vfs_inode", vmlinux
)
socket = socket_alloc.socket
if not (socket and socket.sk):
vollog.log(
constants.LOGLEVEL_VVVV,
f"Skipping file at {hex(needle_addr)} as socket created by LinuxUtilities.container_of is invalid",
)
continue

# sucessfully trversed from file to sock, this will exist in the
# kernel layer, and need to be translated to the memory layer.
psock = socket.sk.dereference()
except exceptions.InvalidAddressException as error:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Unable to follow file at {hex(needle_addr)} to socket due to invalid address: {error}",
)

if psock is not None and sock_physical_addr not in seen_sock_physical_addr:
seen_sock_physical_addr.add(sock_physical_addr)
try:
sock_type = psock.get_type()

family = psock.get_family()
# remove results with no family
if family is None:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Skipping socket at {hex(sock_physical_addr)} as unable to determin family.",
)
continue

# TODO: invesitgate options for more invalid address handling in proccess_sock
# and the later formatting of it's results.
sock_fields = sock_handler.process_sock(psock)
# if no sock_fields we're able to be extracted then skip this result.
if not sock_fields:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Skipping socket at {hex(sock_physical_addr)} as unable to process with SockHandlers.",
)
continue

sock, sock_stat, extended = sock_fields
src, src_port, dst, dst_port, state = sock_stat
protocol = sock.get_protocol()

# format results
src = NotAvailableValue() if src is None else str(src)
src_port = (
NotAvailableValue() if src_port is None else str(src_port)
)
dst = NotAvailableValue() if dst is None else str(dst)
dst_port = (
NotAvailableValue() if dst_port is None else str(dst_port)
)
state = NotAvailableValue() if state is None else str(state)
protocol = (
NotAvailableValue() if protocol is None else str(protocol)
)
# extended attributes is a dict, so this is formated to string show each
# key and value pair, seperated with a comma.
socket_filter_str = (
",".join(f"{k}={v}" for k, v in extended.items())
if extended
else NotAvailableValue()
)

# remove empty results
if (src == "0.0.0.0" or isinstance(src, NotAvailableValue)) and (
dst == "0.0.0.0" or isinstance(src, NotAvailableValue)
):
if state == "UNCONNECTED":
continue
elif src_port == "0" and dst_port == "0":
continue

fields = (
format_hints.Hex(sock_physical_addr),
family,
sock_type,
protocol,
src,
src_port,
dst,
dst_port,
state,
socket_filter_str,
)

yield (0, fields)
except exceptions.InvalidAddressException as error:
vollog.log(
constants.LOGLEVEL_VVVV,
f"Unable create results for socket at {hex(sock_physical_addr)} due to invalid address: {error}",
)

def run(self):

tree_grid_args = [
("Sock Offset", format_hints.Hex),
("Family", str),
("Type", str),
("Proto", str),
("Source Addr", str),
("Source Port", str),
("Destination Addr", str),
("Destination Port", str),
("State", str),
("Filter", str),
]

return TreeGrid(
tree_grid_args,
self._generator(self.config["kernel"]),
)
Loading