Skip to content

Commit

Permalink
cache_create: Extended the command to extract from envelope
Browse files Browse the repository at this point in the history
This commit extends the suit-generator cache_create command.

It splits it into two commands: from_payloads and from_envelope.

from_payload has the same functionality as the old version of
the cache_create command

from_envelope allows extracting payloads directly from the envelope
- also with the possibility to parse the envelope hierarchically
and omit certain payloads.

Ref: NCSDK-28932

Signed-off-by: Artur Hadasz <[email protected]>
  • Loading branch information
ahasztag committed Nov 21, 2024
1 parent 7bf6a6b commit 75a8ee7
Showing 1 changed file with 210 additions and 73 deletions.
283 changes: 210 additions & 73 deletions suit_generator/cmd_cache_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,116 +6,253 @@
"""CMD_CACHE_CREATE CLI command entry point."""

import logging
import cbor2
import math
import cbor2
import re

log = logging.getLogger(__name__)
from suit_generator.exceptions import GeneratorError

CACHE_CREATE_CMD = "cache_create"
CACHE_CREATE_FROM_PAYLOADS_CMD = "from_payloads"
CACHE_CREATE_FROM_ENVELOPE_CMD = "from_envelope"

log = logging.getLogger(__name__)

def add_arguments(parser):
"""Add additional arguments to the passed parser."""
cmd_cache_create_arg_parser = parser.add_parser(CACHE_CREATE_CMD, help="Create raw cache structure.")
cmd_cache_create = parser.add_parser(CACHE_CREATE_CMD, help="Create raw cache structure.")

cmd_cache_create_arg_parser.add_argument(
cmd_cache_create_subparsers = cmd_cache_create.add_subparsers(dest="cache_create_subcommand", required=True, help="Choose cache_create subcommand")
cmd_cache_create_from_payloads = cmd_cache_create_subparsers.add_parser(
CACHE_CREATE_FROM_PAYLOADS_CMD, help="Create a cache partition from the provided binaries containing raw payloads."
)

cmd_cache_create_from_payloads.add_argument("--output-file", required=True, help="Output raw SUIT DFU cache file.")
cmd_cache_create_from_payloads.add_argument(
"--eb-size", type=int, help="Erase block size in bytes (used for padding).", default=16
)

cmd_cache_create_from_payloads.add_argument(
"--input",
required=True,
action="append",
help="Input binary with corresponding URI, passed in format <URI>,<INPUT_FILE_PATH>."
+ "Multiple inputs can be passed.",
)
cmd_cache_create_arg_parser.add_argument("--output-file", required=True, help="Output raw SUIT DFU cache file.")
cmd_cache_create_arg_parser.add_argument(

cmd_cache_create_from_envelope = cmd_cache_create_subparsers.add_parser(
CACHE_CREATE_FROM_ENVELOPE_CMD, help="Create a cache partition from the payloads inside the provided envelope.")

cmd_cache_create_from_envelope.add_argument("--output-file", required=True, help="Output raw SUIT DFU cache file.")
cmd_cache_create_from_envelope.add_argument(
"--eb-size", type=int, help="Erase block size in bytes (used for padding).", default=16
)

cmd_cache_create_from_envelope.add_argument(
"--input-envelope",
required=True,
help="Input envelope file path."
)

def add_padding(data: bytes, eb_size: int) -> bytes:
"""
Add padding to the given data to align it to the specified erase block size.
cmd_cache_create_from_envelope.add_argument(
"--output-envelope",
required=True,
help="Output envelope file path (envelope with removed extracted payloads)."
)

This function ensures that the data is padded to a size that is a multiple of the erase block size (eb_size).
The padding is done by appending a CBOR key-value pair with empty URI as the key and
byte-string-encoded zeros as the value.
cmd_cache_create_from_envelope.add_argument(
"--omit-payload-regex",
help="Integrated payloads matching the regular expression will not be extracted to the cache."
)

:param data: The input data to be padded.
:type data: bytes
:param eb_size: The erase block size in bytes.
:type eb_size: int
:return: The padded data.
"""
rounded_up_size = math.ceil(len(data) / eb_size) * eb_size
padding_size = rounded_up_size - len(data)
padded_data = data

# minimum padding size is 2 bytes
if padding_size == 1:
padding_size += eb_size
rounded_up_size += eb_size

if padding_size == 0:
return data

padded_data += bytes([0x60])

if padding_size <= 23:
header_len = 2
padded_data += bytes([0x40 + (padding_size - header_len)])
elif padding_size <= 0xFFFF:
header_len = 4
padded_data += bytes([0x59]) + (padding_size - header_len).to_bytes(2, byteorder="big")
else:
raise ValueError("Number of required padding bytes exceeds assumed max size 0xFFFF")
cmd_cache_create_from_envelope.add_argument(
"--dependency-regex",
help="Integrated payloads matching the regular expression will be treated as dependency envelopes and parsed hierarchically."
+ "The payloads extracted from the dependency enveloeps will be added to the cache."
)

return padded_data.ljust(rounded_up_size, b"\x00")
class CachePartition:
def __init__(self, eb_size: int):
self.first_slot = True
self.cache_data = bytes()
self.eb_size = eb_size

def add_padding(self, data: bytes) -> bytes:
"""
Add padding to the given data to align it to the specified erase block size.
def main(input: list[str], output_file: str, eb_size: int) -> None:
"""
Create a raw SUIT DFU cache file from input binaries.
This function processes a list of input binaries, each associated with a URI, and creates a raw SUIT DFU cache file.
The data is padded to align with the specified erase block size (eb_size).
:param input: List of input binaries with corresponding URIs, passed in the format <URI>,<INPUT_FILE_PATH>.
:type input: list[str]
:param output_file: Path to the output raw SUIT DFU cache file.
:type output_file: str
:param eb_size: Erase block size in bytes (used for padding).
:type eb_size: int
:return: None
"""
cache_data = bytes()
first_slot = True
This method ensures that the data is padded to a size that is a multiple of the erase block size (self.eb_size).
The padding is done by appending a CBOR key-value pair with empty URI as the key and
byte-string-encoded zeros as the value.
:param data: The input data to be padded.
:type data: bytes
:return: The padded data.
"""
rounded_up_size = math.ceil(len(data) / self.eb_size) * self.eb_size
padding_size = rounded_up_size - len(data)
padded_data = data

# minimum padding size is 2 bytes
if padding_size == 1:
padding_size += self.eb_size
rounded_up_size += self.eb_size

if padding_size == 0:
return data

for single_input in input:
args = single_input.split(",")
if len(args) < 2:
raise ValueError("Invalid number of input arguments: " + single_input)
uri, input_file = args
padded_data += bytes([0x60])

data = None
with open(input_file, "rb") as f:
data = f.read()
if padding_size <= 23:
header_len = 2
padded_data += bytes([0x40 + (padding_size - header_len)])
elif padding_size <= 0xFFFF:
header_len = 4
padded_data += bytes([0x59]) + (padding_size - header_len).to_bytes(2, byteorder="big")
else:
raise ValueError("Number of required padding bytes exceeds assumed max size 0xFFFF")

return padded_data.ljust(rounded_up_size, b"\x00")

def add_cache_slot(self, uri: str, data: bytes):
"""
Add a cache slot to the cache from the given URI and data.
This function creates a cache slot from the given URI and data, and pads the data to align with the specified
erase block size (eb_size). The first slot in the cache is created with indefinite length CBOR map.
:param uri: The URI associated with the data.
:type uri: str
:param data: The data to be included in the cache slot.
:type data: bytes
"""
slot_data = bytes()
if first_slot:
if self.first_slot:
# Open the cache - it is an indefinite length CBOR map (0xBF)
slot_data = bytes([0xBF])
first_slot = False
self.first_slot = False

# uri as key
slot_data += cbor2.dumps(uri)

# Size must be encoded in 4 bytes, thus cannot use cbor2.dumps
slot_data += bytes([0x5A]) + len(data).to_bytes(4, byteorder="big") + data
# Add padding for single slot
slot_data = add_padding(slot_data, eb_size)
slot_data = self.add_padding(slot_data)

self.cache_data += slot_data

def close_and_save_cache(self, output_file: str):
"""
Close the cache and save it to the specified output file.
This function closes the cache by adding the end-of-map byte (0xFF) and saves the cache to the specified output
file.
:param output_file: Path to the output raw SUIT DFU cache file.
:type output_file: str
"""
self.cache_data += bytes([0xFF])
with open(output_file, "wb") as f:
f.write(self.cache_data)

class CacheFromPayloads:
def fill_cache_from_payloads(cache: CachePartition, input: list[str]) -> None:
"""
Process list of input binaries, each associated with a URI, and fill the SUIT DFU cache with the data
:param cache: CachePartition object to fill with the data
:param input: List of input binaries with corresponding URIs, passed in the format <URI>,<INPUT_FILE_PATH>
"""

for single_input in input:
args = single_input.split(",")
if len(args) < 2:
raise ValueError("Invalid number of input arguments: " + single_input)
uri, input_file = args

with open(input_file, "rb") as f:
data = f.read()

cache.add_cache_slot(uri, data)

class CacheFromEnvelope:
def fill_cache_from_envelope_data(cache: CachePartition,
envelope_data: bytes,
omit_payload_regex: str,
dependency_regex: str) -> bytes:
"""
Fill the cache partition with data from the payloads inside the provided envelope binary data.
This function is called recursively for dependency envelopes.
cache_data += slot_data
:param cache: CachePartition object to fill with the data
:param envelope_data: Binary data of the envelope to extract the payloads from
:param omit_payload_regex: Integrated payloads matching the regular expression will not be extracted to the cache
:param dependency_regex: Integrated payloads matching the regular expression will be treated as dependency envelopes
"""
envelope = cbor2.loads(envelope_data)

cache_data += bytes([0xFF]) # Finish the indefinite length map
if isinstance(envelope.value, dict):
integrated = [k for k in envelope.value.keys() if isinstance(k, str)]
else:
raise GeneratorError("The provided envelope/dependency envelope is not a valid envelope!")

if not dependency_regex is None:
integrated_dependencies = [k for k in integrated if not re.fullmatch(dependency_regex, k) is None]
for dep in integrated_dependencies:
integrated.remove(dep)

if omit_payload_regex is None:
payloads_to_extract = integrated
else:
payloads_to_extract = [k for k in integrated if re.fullmatch(omit_payload_regex, k) is None]

for payload in payloads_to_extract:
cache.add_cache_slot(payload, envelope.value.pop(payload))

for dependency in integrated_dependencies:
new_dependency_data = CacheFromEnvelope.fill_cache_from_envelope_data(
cache, envelope.value[dependency], omit_payload_regex, dependency_regex)
envelope.value[dependency] = new_dependency_data

return cbor2.dumps(envelope)

def fill_cache_from_envelope(cache: CachePartition,
input_envelope: str,
output_envelope: str,
omit_payload_regex: str,
dependency_regex: str) -> None:
"""
Fill the cache partition with data from the payloads inside the provided envelope file.
param cache: CachePartition object to fill with the data
param input_envelope: Path to the input envelope file
param output_envelope: Path to the output envelope file (envelope with removed extracted payloads)
param omit_payload_regex: Integrated payloads matching the regular expression will not be extracted to the cache
param dependency_regex: Integrated payloads matching the regular expression will be treated as dependency envelopes
"""
with open(input_envelope, "rb") as fh:
data = fh.read()
output_envelope_data = CacheFromEnvelope.fill_cache_from_envelope_data(cache,
data, omit_payload_regex, dependency_regex)
with open(output_envelope, "wb") as fh:
fh.write(output_envelope_data)

def main(**kwargs) -> None:
"""
Create a raw SUIT DFU cache file.
"""

cache = CachePartition(kwargs["eb_size"])

if kwargs["cache_create_subcommand"] == CACHE_CREATE_FROM_PAYLOADS_CMD:
CacheFromPayloads.fill_cache_from_payloads(cache, kwargs["input"])
elif kwargs["cache_create_subcommand"] == CACHE_CREATE_FROM_ENVELOPE_CMD:
CacheFromEnvelope.fill_cache_from_envelope(cache,
kwargs["input_envelope"],
kwargs["output_envelope"],
kwargs["omit_payload_regex"],
kwargs["dependency_regex"])
else:
raise GeneratorError(f"Invalid 'cache_create' subcommand: {kwargs['cache_create_subcommand']}")

with open(output_file, "wb") as f:
f.write(cache_data)
cache.close_and_save_cache(kwargs["output_file"])

0 comments on commit 75a8ee7

Please sign in to comment.