diff --git a/test/test_volatility.py b/test/test_volatility.py index aaad615bcc..348c161da4 100644 --- a/test/test_volatility.py +++ b/test/test_volatility.py @@ -6,12 +6,12 @@ # import os +import re import subprocess import sys import shutil import tempfile import hashlib -import ntpath import json # @@ -124,11 +124,7 @@ def test_windows_dumpfiles(image, volatility, python): known_files = json.load(json_file) failed_chksms = 0 - - if sys.platform == "win32": - file_name = ntpath.basename(image) - else: - file_name = os.path.basename(image) + file_name = os.path.basename(image) try: for addr in known_files["windows_dumpfiles"][file_name]: @@ -331,6 +327,36 @@ def test_linux_tty_check(image, volatility, python): assert rc == 0 +def test_linux_ip_addr(image, volatility, python): + rc, out, err = runvol_plugin("linux.ip.Addr", image, volatility, python) + + assert re.search( + rb"2\s+eth0\s+00:0c:29:8f:ed:ca\s+False\s+192.168.201.161\s+24\s+global\s+UP", + out, + ) + assert re.search( + rb"2\s+eth0\s+00:0c:29:8f:ed:ca\s+False\s+fe80::20c:29ff:fe8f:edca\s+64\s+link\s+UP", + out, + ) + assert out.count(b"\n") >= 8 + assert rc == 0 + + +def test_linux_ip_link(image, volatility, python): + rc, out, err = runvol_plugin("linux.ip.Link", image, volatility, python) + + assert re.search( + rb"-\s+lo\s+00:00:00:00:00:00\s+UNKNOWN\s+16436\s+noqueue\s+0\s+LOOPBACK,LOWER_UP,UP", + out, + ) + assert re.search( + rb"-\s+eth0\s+00:0c:29:8f:ed:ca\s+UP\s+1500\s+pfifo_fast\s+1000\s+BROADCAST,LOWER_UP,MULTICAST,UP", + out, + ) + assert out.count(b"\n") >= 6 + assert rc == 0 + + # MAC diff --git a/volatility3/framework/constants/linux/__init__.py b/volatility3/framework/constants/linux/__init__.py index 6e8883f195..b5972ca519 100644 --- a/volatility3/framework/constants/linux/__init__.py +++ b/volatility3/framework/constants/linux/__init__.py @@ -5,6 +5,7 @@ Linux-specific values that aren't found in debug symbols """ +from enum import Enum KERNEL_NAME = "__kernel__" @@ -281,3 +282,49 @@ ) ELF_MAX_EXTRACTION_SIZE = 1024 * 1024 * 1024 * 4 - 1 + +# For IFA_* below - Ref: include/net/ipv6.h +IPV6_ADDR_LOOPBACK = 0x0010 +IPV6_ADDR_LINKLOCAL = 0x0020 +IPV6_ADDR_SITELOCAL = 0x0040 +# For inet6_ifaddr - Ref: include/net/if_inet6.h +IFA_HOST = IPV6_ADDR_LOOPBACK +IFA_LINK = IPV6_ADDR_LINKLOCAL +IFA_SITE = IPV6_ADDR_SITELOCAL + +# Only for kernels < 3.15 when the net_device_flags enum didn't exist +# ref include/uapi/linux/if.h +NET_DEVICE_FLAGS = { + "IFF_UP": 0x1, + "IFF_BROADCAST": 0x2, + "IFF_DEBUG": 0x4, + "IFF_LOOPBACK": 0x8, + "IFF_POINTOPOINT": 0x10, + "IFF_NOTRAILERS": 0x20, + "IFF_RUNNING": 0x40, + "IFF_NOARP": 0x80, + "IFF_PROMISC": 0x100, + "IFF_ALLMULTI": 0x200, + "IFF_MASTER": 0x400, + "IFF_SLAVE": 0x800, + "IFF_MULTICAST": 0x1000, + "IFF_PORTSEL": 0x2000, + "IFF_AUTOMEDIA": 0x4000, + "IFF_DYNAMIC": 0x8000, + "IFF_LOWER_UP": 0x10000, + "IFF_DORMANT": 0x20000, + "IFF_ECHO": 0x40000, +} + + +# Kernels >= 2.6.17. See IF_OPER_* in include/uapi/linux/if.h +class IF_OPER_STATES(Enum): + """RFC 2863 - Network interface operational status""" + + UNKNOWN = 0 + NOTPRESENT = 1 + DOWN = 2 + LOWERLAYERDOWN = 3 + TESTING = 4 + DORMANT = 5 + UP = 6 diff --git a/volatility3/framework/interfaces/objects.py b/volatility3/framework/interfaces/objects.py index ab568b927a..b742671976 100644 --- a/volatility3/framework/interfaces/objects.py +++ b/volatility3/framework/interfaces/objects.py @@ -133,7 +133,7 @@ def __init__( def __getattr__(self, attr: str) -> Any: """Method for ensuring volatility members can be returned.""" - raise AttributeError + raise AttributeError() @property def vol(self) -> ReadOnlyMapping: diff --git a/volatility3/framework/plugins/linux/ip.py b/volatility3/framework/plugins/linux/ip.py new file mode 100644 index 0000000000..6b523379bc --- /dev/null +++ b/volatility3/framework/plugins/linux/ip.py @@ -0,0 +1,148 @@ +# This file is Copyright 2023 Volatility Foundation and licensed under the Volatility Software License 1.0 +# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0 +# + +from typing import List +from volatility3.framework import interfaces, renderers, constants +from volatility3.framework.configuration import requirements +from volatility3.framework.interfaces import plugins + + +class Addr(plugins.PluginInterface): + """Lists network interface information for all devices""" + + _required_framework_version = (2, 0, 0) + + _version = (1, 0, 0) + + @classmethod + def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: + return [ + requirements.ModuleRequirement( + name="kernel", + description="Linux kernel", + architectures=["Intel32", "Intel64"], + ), + ] + + def _gather_net_dev_info(self, net_dev): + mac_addr = net_dev.get_mac_address() + promisc = net_dev.promisc + operational_state = net_dev.get_operational_state() + iface_name = net_dev.get_device_name() + iface_ifindex = net_dev.ifindex + try: + net_ns_id = net_dev.get_net_namespace_id() + except AttributeError: + net_ns_id = renderers.NotAvailableValue() + + # Interface IPv4 Addresses + in_device = net_dev.ip_ptr.dereference().cast("in_device") + for in_ifaddr in in_device.get_addresses(): + prefix_len = in_ifaddr.get_prefix_len() + scope_type = in_ifaddr.get_scope_type() + ip_addr = in_ifaddr.get_address() + yield net_ns_id, iface_ifindex, iface_name, mac_addr, promisc, ip_addr, prefix_len, scope_type, operational_state + + # Interface IPv6 Addresses + inet6_dev = net_dev.ip6_ptr.dereference().cast("inet6_dev") + for inet6_ifaddr in inet6_dev.get_addresses(): + prefix_len = inet6_ifaddr.get_prefix_len() + scope_type = inet6_ifaddr.get_scope_type() + ip6_addr = inet6_ifaddr.get_address() + yield net_ns_id, iface_ifindex, iface_name, mac_addr, promisc, ip6_addr, prefix_len, scope_type, operational_state + + def _generator(self): + vmlinux = self.context.modules[self.config["kernel"]] + + net_type_symname = vmlinux.symbol_table_name + constants.BANG + "net" + net_device_symname = vmlinux.symbol_table_name + constants.BANG + "net_device" + + # 'net_namespace_list' exists from kernels >= 2.6.24 + net_namespace_list = vmlinux.object_from_symbol("net_namespace_list") + for net_ns in net_namespace_list.to_list(net_type_symname, "list"): + for net_dev in net_ns.dev_base_head.to_list(net_device_symname, "dev_list"): + for fields in self._gather_net_dev_info(net_dev): + yield 0, fields + + def run(self): + headers = [ + ("NetNS", int), + ("Index", int), + ("Interface", str), + ("MAC", str), + ("Promiscuous", bool), + ("IP", str), + ("Prefix", int), + ("Scope Type", str), + ("State", str), + ] + + return renderers.TreeGrid(headers, self._generator()) + + +class Link(plugins.PluginInterface): + """Lists information about network interfaces similar to `ip link show`""" + + _required_framework_version = (2, 0, 0) + _version = (1, 0, 0) + + @classmethod + def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]: + return [ + requirements.ModuleRequirement( + name="kernel", + description="Linux kernel", + architectures=["Intel32", "Intel64"], + ) + ] + + def _gather_net_dev_link_info(self, net_device): + mac_addr = net_device.get_mac_address() + operational_state = net_device.get_operational_state() + iface_name = net_device.get_device_name() + mtu = net_device.mtu + qdisc_name = net_device.get_qdisc_name() + qlen = net_device.get_queue_length() + try: + net_ns_id = net_device.get_net_namespace_id() + except AttributeError: + net_ns_id = renderers.NotAvailableValue() + + # Format flags to string. Drop IFF_ to match iproute2 'ip link' output. + # Also, note that iproute2 removes IFF_RUNNING, see print_link_flags() + flags_list = [ + flag.replace("IFF_", "") + for flag in net_device.get_flag_names() + if flag != "IFF_RUNNING" + ] + flags_str = ",".join(flags_list) + + yield net_ns_id, iface_name, mac_addr, operational_state, mtu, qdisc_name, qlen, flags_str + + def _generator(self): + vmlinux = self.context.modules[self.config["kernel"]] + + net_type_symname = vmlinux.symbol_table_name + constants.BANG + "net" + net_device_symname = vmlinux.symbol_table_name + constants.BANG + "net_device" + + # 'net_namespace_list' exists from kernels >= 2.6.24 + net_namespace_list = vmlinux.object_from_symbol("net_namespace_list") + for net_ns in net_namespace_list.to_list(net_type_symname, "list"): + for net_dev in net_ns.dev_base_head.to_list(net_device_symname, "dev_list"): + for fields in self._gather_net_dev_link_info(net_dev): + yield 0, fields + + def run(self): + headers = [ + ("NS", int), + ("Interface", str), + ("MAC", str), + ("State", str), + ("MTU", int), + ("Qdisc", str), + ("Qlen", int), + ("Flags", str), + ] + + return renderers.TreeGrid(headers, self._generator()) diff --git a/volatility3/framework/symbols/linux/__init__.py b/volatility3/framework/symbols/linux/__init__.py index c4e2587f49..10cc546b77 100644 --- a/volatility3/framework/symbols/linux/__init__.py +++ b/volatility3/framework/symbols/linux/__init__.py @@ -43,6 +43,11 @@ def __init__(self, *args, **kwargs) -> None: # Network self.set_type_class("net", extensions.net) + self.set_type_class("net_device", extensions.net_device) + self.set_type_class("in_device", extensions.in_device) + self.set_type_class("in_ifaddr", extensions.in_ifaddr) + self.set_type_class("inet6_dev", extensions.inet6_dev) + self.set_type_class("inet6_ifaddr", extensions.inet6_ifaddr) self.set_type_class("socket", extensions.socket) self.set_type_class("sock", extensions.sock) self.set_type_class("inet_sock", extensions.inet_sock) diff --git a/volatility3/framework/symbols/linux/extensions/__init__.py b/volatility3/framework/symbols/linux/extensions/__init__.py index d73d0cfb9a..a1b8a3e839 100644 --- a/volatility3/framework/symbols/linux/extensions/__init__.py +++ b/volatility3/framework/symbols/linux/extensions/__init__.py @@ -5,7 +5,7 @@ import collections.abc import logging import socket as socket_module -from typing import Generator, Iterable, Iterator, Optional, Tuple, List +from typing import Generator, Iterable, Iterator, Optional, Tuple, List, Dict from volatility3.framework import constants from volatility3.framework.constants.linux import SOCK_TYPES, SOCK_FAMILY @@ -13,11 +13,15 @@ from volatility3.framework.constants.linux import TCP_STATES, NETLINK_PROTOCOLS from volatility3.framework.constants.linux import ETH_PROTOCOLS, BLUETOOTH_STATES from volatility3.framework.constants.linux import BLUETOOTH_PROTOCOLS, SOCKET_STATES -from volatility3.framework.constants.linux import CAPABILITIES +from volatility3.framework.constants.linux import CAPABILITIES, NET_DEVICE_FLAGS +from volatility3.framework.constants.linux import IFA_HOST, IFA_LINK, IFA_SITE +from volatility3.framework.constants.linux import IF_OPER_STATES +from volatility3.framework.renderers import conversion, UnparsableValue from volatility3.framework import exceptions, objects, interfaces, symbols from volatility3.framework.layers import linear from volatility3.framework.objects import utility from volatility3.framework.symbols import generic, linux, intermed +from volatility3.framework.symbols.wrappers import Flags from volatility3.framework.symbols.linux.extensions import elf vollog = logging.getLogger(__name__) @@ -1213,6 +1217,15 @@ def get_mount_points(self): class net(objects.StructType): def get_inode(self): + """Get the namespace id for this network namespace. + + Raises: + AttributeError: If it cannot find the network namespace id for the + current kernel. + + Returns: + int: the namespace id + """ if self.has_member("proc_inum"): # 3.8.13 <= kernel < 3.19.8 return self.proc_inum @@ -1224,6 +1237,337 @@ def get_inode(self): raise AttributeError("Unable to find net_namespace inode") +class net_device(objects.StructType): + def get_device_name(self) -> str: + """Return the network device name + + Returns: + str: The network device name + """ + return utility.array_to_string(self.name) + + def _format_as_mac_address(self, hwaddr): + return ":".join([f"{x:02x}" for x in hwaddr[: self.addr_len]]) + + def get_mac_address(self) -> str: + """Get the MAC address of this network interface. + + Returns: + str: the MAC address of this network interface. + """ + if self.has_member("perm_addr"): + null_mac_addr_bytes = b"\x00" * self.addr_len + null_mac_addr = self._format_as_mac_address(null_mac_addr_bytes) + mac_addr = self._format_as_mac_address(self.perm_addr) + if mac_addr != null_mac_addr: + return mac_addr + + parent_layer = self._context.layers[self.vol.layer_name] + try: + hwaddr = parent_layer.read(self.dev_addr, self.addr_len, pad=True) + except exceptions.InvalidAddressException: + vollog.debug( + f"Unable to read network inteface mac address from {self.dev_addr:#x}" + ) + return None + + return self._format_as_mac_address(hwaddr) + + def _get_flag_choices(self) -> Dict: + """Return the net_device flags as a list of strings""" + vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self) + try: + # kernels >= 3.15 + net_device_flags_enum = vmlinux.get_enumeration("net_device_flags") + choices = net_device_flags_enum.choices + except exceptions.SymbolError: + # kernels < 3.15 + choices = NET_DEVICE_FLAGS + + return choices + + def _get_net_device_flag_value(self, name): + """Return the net_device flag value based on the flag name""" + return self._get_flag_choices().get(name, UnparsableValue()) + + def _get_netdev_state_t(self): + vmlinux = linux.LinuxUtilities.get_module_from_volobj_type(self._context, self) + try: + # At least from kernels 2.6.30 + return vmlinux.get_enumeration("netdev_state_t") + except exceptions.SymbolError: + raise exceptions.VolatilityException( + "Unsupported kernel or wrong ISF. Cannot find 'netdev_state_t' enumeration" + ) + + def is_running(self) -> bool: + """Test if the network device has been brought up + Based on netif_running() + + Returns: + bool: True if the device is UP + """ + netdev_state_t_enum = self._get_netdev_state_t() + + # It should be safe. netdev_state_t::__LINK_STATE_START has been available since + # at least kernels 2.6.30 + return ( + self.state & (1 << netdev_state_t_enum.choices["__LINK_STATE_START"]) != 0 + ) + + def is_carrier_ok(self) -> bool: + """Check if carrier is present on network device + Based on netif_carrier_ok() + + Returns: + bool: True if carrier present + """ + netdev_state_t_enum = self._get_netdev_state_t() + + # It should be safe. netdev_state_t::__LINK_STATE_NOCARRIER has been available + # since at least kernels 2.6.30 + return ( + self.state & (1 << netdev_state_t_enum.choices["__LINK_STATE_NOCARRIER"]) + == 0 + ) + + def is_dormant(self) -> bool: + """Check if the network device is dormant + Based on netif_dormant(() + + Returns: + bool: True if the network device is dormant + """ + netdev_state_t_enum = self._get_netdev_state_t() + + # It should be safe. netdev_state_t::__LINK_STATE_DORMANT has been available + # since at least kernels 2.6.30 + return ( + self.state & (1 << netdev_state_t_enum.choices["__LINK_STATE_DORMANT"]) != 0 + ) + + def is_operational(self) -> bool: + """Test if the carrier is operational + Based on netif_oper_up() + + Returns: + bool: True if the device is UP + """ + + return self.get_operational_state() in ("UP", "UNKNOWN") + + def get_flag_names(self) -> List[str]: + """Return the net_device flags as a list of strings. + This is the combination of flags exported through kernel APIs to userspace. + Based on dev_get_flags() + + Returns: + List[str]: A list of flag names + """ + choices = self._get_flag_choices() + clear_flags = choices.get("IFF_PROMISC", 0) + clear_flags |= choices.get("IFF_ALLMULTI", 0) + clear_flags |= choices.get("IFF_RUNNING", 0) + clear_flags |= choices.get("IFF_LOWER_UP", 0) + clear_flags |= choices.get("IFF_DORMANT", 0) + + clear_gflags = choices.get("IFF_PROMISC", 0) + clear_gflags |= choices.get("IFF_ALLMULTI)", 0) + + flags = (self.flags & ~clear_flags) | (self.gflags & ~clear_gflags) + + if self.is_running(): + if self.is_operational(): + flags |= choices.get("IFF_RUNNING", 0) + if self.is_carrier_ok(): + flags |= choices.get("IFF_LOWER_UP", 0) + if self.is_dormant(): + flags |= choices.get("IFF_DORMANT", 0) + + net_device_flags_enum_flags = Flags(choices) + net_device_flags = net_device_flags_enum_flags(flags) + + # It's preferable to provide a deterministic list of items. i.e. for testing + return sorted(net_device_flags) + + @property + def promisc(self): + """Return if this network interface is in promiscuous mode. + + Returns: + bool: True if this network interface is in promiscuous mode. Otherwise, False + """ + return self.flags & self._get_net_device_flag_value("IFF_PROMISC") != 0 + + def get_net_namespace_id(self) -> int: + """Return the network namespace id for this network interface. + + Returns: + int: the network namespace id for this network interface + """ + nd_net = self.nd_net + if nd_net.has_member("net"): + # In kernel 4.1.52 the 'nd_net' member type was changed from + # 'struct net*' to 'possible_net_t' which has a 'struct net *net' member. + net_ns_id = nd_net.net.get_inode() + else: + # In kernels < 4.1.52 the 'nd_net'member type was 'struct net*' + net_ns_id = nd_net.get_inode() + + return net_ns_id + + def get_operational_state(self) -> str: + """Return the netwok device oprational state (RFC 2863) string + + Returns: + str: A string with the operational state + """ + try: + return IF_OPER_STATES(self.operstate).name + except ValueError: + vollog.warning(f"Invalid net_device operational state '{self.operstate}'") + return UnparsableValue() + + def get_qdisc_name(self) -> str: + """Return the network device queuing discipline (qdisc) name + + Returns: + str: A string with the queuing discipline (qdisc) name + """ + return utility.array_to_string(self.qdisc.ops.id) + + def get_queue_length(self) -> int: + """Return the netwrok device transmision qeueue length (qlen) + + Returns: + int: the netwrok device transmision qeueue length (qlen) + """ + return self.tx_queue_len + + +class in_device(objects.StructType): + def get_addresses(self): + """Yield the IPv4 ifaddr addresses + + Yields: + in_ifaddr: An IPv4 ifaddr address + """ + cur = self.ifa_list + while cur and cur.vol.offset: + yield cur + cur = cur.ifa_next + + +class inet6_dev(objects.StructType): + def get_addresses(self): + """Yield the IPv6 ifaddr addresses + + Yields: + inet6_ifaddr: An IPv6 ifaddr address + """ + if not self.has_member( + "addr_list" + ) or not self.addr_list.vol.type_name.endswith(constants.BANG + "list_head"): + # kernels < 3.0 + # FIXME: struct inet6_ifaddr *addr_list; + vollog.warning( + "IPv6 is unsupported for this kernel. Check if the ISF contains the appropriate 'inet6_dev' type" + ) + return + + symbol_space = self._context.symbol_space + table_name = self.get_symbol_table_name() + inet6_ifaddr_symname = table_name + constants.BANG + "inet6_ifaddr" + if not symbol_space.has_type(inet6_ifaddr_symname) or not symbol_space.get_type( + inet6_ifaddr_symname + ).has_member("if_list"): + vollog.warning( + "IPv6 is unsupported for this kernel. Check if the ISF contains the appropriate 'inet6_ifaddr' type" + ) + return + + # 'if_list' member was added to 'inet6_ifaddr' type in kernels 3.0 + for inet6_ifaddr in self.addr_list.to_list(inet6_ifaddr_symname, "if_list"): + yield inet6_ifaddr + + +class in_ifaddr(objects.StructType): + # Translation to text based on iproute2 package. See 'rtnl_rtscope_tab' in lib/rt_names.c + _rtnl_rtscope_tab = { + "RT_SCOPE_UNIVERSE": "global", + "RT_SCOPE_NOWHERE": "nowhere", + "RT_SCOPE_HOST": "host", + "RT_SCOPE_LINK": "link", + "RT_SCOPE_SITE": "site", + } + + def get_scope_type(self): + """Get the scope type for this IPv4 address + + Returns: + str: the IPv4 scope type. + """ + table_name = self.get_symbol_table_name() + rt_scope_enum = self._context.symbol_space.get_enumeration( + table_name + constants.BANG + "rt_scope_t" + ) + try: + rt_scope = rt_scope_enum.lookup(self.ifa_scope) + except ValueError: + return "unknown" + + return self._rtnl_rtscope_tab.get(rt_scope, "unknown") + + def get_address(self): + """Get an string with the IPv4 address + + Returns: + str: the IPv4 address + """ + return conversion.convert_ipv4(self.ifa_address) + + def get_prefix_len(self): + """Get the IPv4 address prefix len + + Returns: + int: the IPv4 address prefix len + """ + return self.ifa_prefixlen + + +class inet6_ifaddr(objects.StructType): + def get_scope_type(self): + """Get the scope type for this IPv6 address + + Returns: + str: the IPv6 scope type. + """ + if (self.scope & IFA_HOST) != 0: + return "host" + elif (self.scope & IFA_LINK) != 0: + return "link" + elif (self.scope & IFA_SITE) != 0: + return "site" + else: + return "global" + + def get_address(self): + """Get an string with the IPv6 address + + Returns: + str: the IPv6 address + """ + return conversion.convert_ipv6(self.addr.in6_u.u6_addr32) + + def get_prefix_len(self): + """Get the IPv6 address prefix len + + Returns: + int: the IPv6 address prefix len + """ + return self.prefix_len + + class socket(objects.StructType): def _get_vol_kernel(self): symbol_table_arr = self.vol.type_name.split("!", 1)