Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce scatter-gather scatterlists #1338

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions volatility3/framework/symbols/linux/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(self, *args, **kwargs) -> None:
self.optional_set_type_class("bpf_prog_aux", extensions.bpf_prog_aux)
self.optional_set_type_class("kernel_cap_struct", extensions.kernel_cap_struct)
self.optional_set_type_class("kernel_cap_t", extensions.kernel_cap_t)
self.optional_set_type_class("scatterlist", extensions.scatterlist)

# kernels >= 4.18
self.optional_set_type_class("timespec64", extensions.timespec64)
Expand Down
108 changes: 108 additions & 0 deletions volatility3/framework/symbols/linux/extensions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2410,3 +2410,111 @@ def get_nodes(self) -> Iterator[int]:
"""

yield from self._walk_nodes(root_node=self.rb_node)


class scatterlist(objects.StructType):
SG_CHAIN = 0x01
SG_END = 0x02
SG_PAGE_LINK_MASK = SG_CHAIN | SG_END

def _sg_flags(self) -> int:
return self.page_link & self.SG_PAGE_LINK_MASK

def _sg_is_chain(self) -> int:
return self._sg_flags() & self.SG_CHAIN

def _sg_is_last(self) -> int:
return self._sg_flags() & self.SG_END

def _sg_chain_ptr(self) -> int:
"""Clears the last two bits basically."""
return self.page_link & ~self.SG_PAGE_LINK_MASK

def _sg_dma_len(self) -> int:
# Depends on CONFIG_NEED_SG_DMA_LENGTH
if self.has_member("dma_length"):
return self.dma_length
return self.length

def _get_sg_max_single_alloc(self) -> int:
"""Based on kernel's SG_MAX_SINGLE_ALLOC.

Doc. from kernel source :
* Maximum number of entries that will be allocated in one piece, if
* a list larger than this is required then chaining will be utilized.
"""
return self._context.layers[self.vol.layer_name].page_size // self.vol.size

def _sg_next(self) -> Optional[interfaces.objects.ObjectInterface]:
"""Get the next scatterlist struct from the list.
Based on kernel's sg_next.

Doc. from kernel source :
* Notes on SG table design.
*
* We use the unsigned long page_link field in the scatterlist struct to place
* the page pointer AND encode information about the sg table as well. The two
* lower bits are reserved for this information.
*
* If bit 0 is set, then the page_link contains a pointer to the next sg
* table list. Otherwise the next entry is at sg + 1.
*
* If bit 1 is set, then this sg entry is the last element in a list.
"""
if self._sg_is_last():
return None

if self._sg_is_chain():
next_address = self._sg_chain_ptr()
else:
next_address = self.vol.offset + self.vol.size

sg = self._context.object(
self.get_symbol_table_name() + constants.BANG + "scatterlist",
self.vol.layer_name,
next_address,
)
return sg

def for_each_sg(self) -> Optional[Iterator[interfaces.objects.ObjectInterface]]:
"""Iterate over each struct in the scatterlist."""
sg = self
sg_max_single_alloc = self._get_sg_max_single_alloc()

# Empty scatterlists protection
if sg.page_link == 0 and sg._sg_dma_len() == 0 and sg.dma_address == 0:
return None
else:
# Yield itself first
yield sg

entries_count = 1
# entries_count <= sg_max_single_alloc should always be true if the
# scatterlists were correctly chained.
while entries_count <= sg_max_single_alloc:
sg = sg._sg_next()
if sg is None:
break
# Points to a new scatterlist
elif sg._sg_is_chain():
entries_count = 0
else:
entries_count += 1
yield sg

def get_content(
self,
) -> Optional[Iterator[bytes]]:
"""Traverse a scatterlist to gather content located at each
dma_address position.

Returns:
An iterator of bytes
"""
# Either "physical" is layer-1 because this is a module layer, either "physical" is the current layer
physical_layer_name = self._context.layers[self.vol.layer_name].config.get(
"memory_layer", self.vol.layer_name
)
physical_layer = self._context.layers[physical_layer_name]
for sg in self.for_each_sg():
yield from physical_layer.read(sg.dma_address, sg._sg_dma_len())
Loading