From 75a8ee766485a01ccf2c2bd5337749ca50b23fca Mon Sep 17 00:00:00 2001 From: Artur Hadasz Date: Wed, 20 Nov 2024 17:34:27 +0100 Subject: [PATCH] cache_create: Extended the command to extract from envelope This commit extends the suit-generator cache_create command. It splits it into two commands: from_payloads and from_envelope. from_payload has the same functionality as the old version of the cache_create command from_envelope allows extracting payloads directly from the envelope - also with the possibility to parse the envelope hierarchically and omit certain payloads. Ref: NCSDK-28932 Signed-off-by: Artur Hadasz --- suit_generator/cmd_cache_create.py | 283 +++++++++++++++++++++-------- 1 file changed, 210 insertions(+), 73 deletions(-) diff --git a/suit_generator/cmd_cache_create.py b/suit_generator/cmd_cache_create.py index 3b6feaa..c54d98f 100644 --- a/suit_generator/cmd_cache_create.py +++ b/suit_generator/cmd_cache_create.py @@ -6,104 +6,131 @@ """CMD_CACHE_CREATE CLI command entry point.""" import logging -import cbor2 import math +import cbor2 +import re -log = logging.getLogger(__name__) +from suit_generator.exceptions import GeneratorError CACHE_CREATE_CMD = "cache_create" +CACHE_CREATE_FROM_PAYLOADS_CMD = "from_payloads" +CACHE_CREATE_FROM_ENVELOPE_CMD = "from_envelope" +log = logging.getLogger(__name__) def add_arguments(parser): """Add additional arguments to the passed parser.""" - cmd_cache_create_arg_parser = parser.add_parser(CACHE_CREATE_CMD, help="Create raw cache structure.") + cmd_cache_create = parser.add_parser(CACHE_CREATE_CMD, help="Create raw cache structure.") - cmd_cache_create_arg_parser.add_argument( + cmd_cache_create_subparsers = cmd_cache_create.add_subparsers(dest="cache_create_subcommand", required=True, help="Choose cache_create subcommand") + cmd_cache_create_from_payloads = cmd_cache_create_subparsers.add_parser( + CACHE_CREATE_FROM_PAYLOADS_CMD, help="Create a cache partition from the provided binaries containing raw payloads." + ) + + cmd_cache_create_from_payloads.add_argument("--output-file", required=True, help="Output raw SUIT DFU cache file.") + cmd_cache_create_from_payloads.add_argument( + "--eb-size", type=int, help="Erase block size in bytes (used for padding).", default=16 + ) + + cmd_cache_create_from_payloads.add_argument( "--input", required=True, action="append", help="Input binary with corresponding URI, passed in format ,." + "Multiple inputs can be passed.", ) - cmd_cache_create_arg_parser.add_argument("--output-file", required=True, help="Output raw SUIT DFU cache file.") - cmd_cache_create_arg_parser.add_argument( + + cmd_cache_create_from_envelope = cmd_cache_create_subparsers.add_parser( + CACHE_CREATE_FROM_ENVELOPE_CMD, help="Create a cache partition from the payloads inside the provided envelope.") + + cmd_cache_create_from_envelope.add_argument("--output-file", required=True, help="Output raw SUIT DFU cache file.") + cmd_cache_create_from_envelope.add_argument( "--eb-size", type=int, help="Erase block size in bytes (used for padding).", default=16 ) + cmd_cache_create_from_envelope.add_argument( + "--input-envelope", + required=True, + help="Input envelope file path." + ) -def add_padding(data: bytes, eb_size: int) -> bytes: - """ - Add padding to the given data to align it to the specified erase block size. + cmd_cache_create_from_envelope.add_argument( + "--output-envelope", + required=True, + help="Output envelope file path (envelope with removed extracted payloads)." + ) - This function ensures that the data is padded to a size that is a multiple of the erase block size (eb_size). - The padding is done by appending a CBOR key-value pair with empty URI as the key and - byte-string-encoded zeros as the value. + cmd_cache_create_from_envelope.add_argument( + "--omit-payload-regex", + help="Integrated payloads matching the regular expression will not be extracted to the cache." + ) - :param data: The input data to be padded. - :type data: bytes - :param eb_size: The erase block size in bytes. - :type eb_size: int - :return: The padded data. - """ - rounded_up_size = math.ceil(len(data) / eb_size) * eb_size - padding_size = rounded_up_size - len(data) - padded_data = data - - # minimum padding size is 2 bytes - if padding_size == 1: - padding_size += eb_size - rounded_up_size += eb_size - - if padding_size == 0: - return data - - padded_data += bytes([0x60]) - - if padding_size <= 23: - header_len = 2 - padded_data += bytes([0x40 + (padding_size - header_len)]) - elif padding_size <= 0xFFFF: - header_len = 4 - padded_data += bytes([0x59]) + (padding_size - header_len).to_bytes(2, byteorder="big") - else: - raise ValueError("Number of required padding bytes exceeds assumed max size 0xFFFF") + cmd_cache_create_from_envelope.add_argument( + "--dependency-regex", + help="Integrated payloads matching the regular expression will be treated as dependency envelopes and parsed hierarchically." + + "The payloads extracted from the dependency enveloeps will be added to the cache." + ) - return padded_data.ljust(rounded_up_size, b"\x00") +class CachePartition: + def __init__(self, eb_size: int): + self.first_slot = True + self.cache_data = bytes() + self.eb_size = eb_size + def add_padding(self, data: bytes) -> bytes: + """ + Add padding to the given data to align it to the specified erase block size. -def main(input: list[str], output_file: str, eb_size: int) -> None: - """ - Create a raw SUIT DFU cache file from input binaries. - - This function processes a list of input binaries, each associated with a URI, and creates a raw SUIT DFU cache file. - The data is padded to align with the specified erase block size (eb_size). - - :param input: List of input binaries with corresponding URIs, passed in the format ,. - :type input: list[str] - :param output_file: Path to the output raw SUIT DFU cache file. - :type output_file: str - :param eb_size: Erase block size in bytes (used for padding). - :type eb_size: int - :return: None - """ - cache_data = bytes() - first_slot = True + This method ensures that the data is padded to a size that is a multiple of the erase block size (self.eb_size). + The padding is done by appending a CBOR key-value pair with empty URI as the key and + byte-string-encoded zeros as the value. + + :param data: The input data to be padded. + :type data: bytes + :return: The padded data. + """ + rounded_up_size = math.ceil(len(data) / self.eb_size) * self.eb_size + padding_size = rounded_up_size - len(data) + padded_data = data + + # minimum padding size is 2 bytes + if padding_size == 1: + padding_size += self.eb_size + rounded_up_size += self.eb_size + + if padding_size == 0: + return data - for single_input in input: - args = single_input.split(",") - if len(args) < 2: - raise ValueError("Invalid number of input arguments: " + single_input) - uri, input_file = args + padded_data += bytes([0x60]) - data = None - with open(input_file, "rb") as f: - data = f.read() + if padding_size <= 23: + header_len = 2 + padded_data += bytes([0x40 + (padding_size - header_len)]) + elif padding_size <= 0xFFFF: + header_len = 4 + padded_data += bytes([0x59]) + (padding_size - header_len).to_bytes(2, byteorder="big") + else: + raise ValueError("Number of required padding bytes exceeds assumed max size 0xFFFF") + return padded_data.ljust(rounded_up_size, b"\x00") + + def add_cache_slot(self, uri: str, data: bytes): + """ + Add a cache slot to the cache from the given URI and data. + + This function creates a cache slot from the given URI and data, and pads the data to align with the specified + erase block size (eb_size). The first slot in the cache is created with indefinite length CBOR map. + + :param uri: The URI associated with the data. + :type uri: str + :param data: The data to be included in the cache slot. + :type data: bytes + """ slot_data = bytes() - if first_slot: + if self.first_slot: # Open the cache - it is an indefinite length CBOR map (0xBF) slot_data = bytes([0xBF]) - first_slot = False + self.first_slot = False # uri as key slot_data += cbor2.dumps(uri) @@ -111,11 +138,121 @@ def main(input: list[str], output_file: str, eb_size: int) -> None: # Size must be encoded in 4 bytes, thus cannot use cbor2.dumps slot_data += bytes([0x5A]) + len(data).to_bytes(4, byteorder="big") + data # Add padding for single slot - slot_data = add_padding(slot_data, eb_size) + slot_data = self.add_padding(slot_data) + + self.cache_data += slot_data + + def close_and_save_cache(self, output_file: str): + """ + Close the cache and save it to the specified output file. + + This function closes the cache by adding the end-of-map byte (0xFF) and saves the cache to the specified output + file. + + :param output_file: Path to the output raw SUIT DFU cache file. + :type output_file: str + """ + self.cache_data += bytes([0xFF]) + with open(output_file, "wb") as f: + f.write(self.cache_data) + +class CacheFromPayloads: + def fill_cache_from_payloads(cache: CachePartition, input: list[str]) -> None: + """ + Process list of input binaries, each associated with a URI, and fill the SUIT DFU cache with the data + + :param cache: CachePartition object to fill with the data + :param input: List of input binaries with corresponding URIs, passed in the format , + """ + + for single_input in input: + args = single_input.split(",") + if len(args) < 2: + raise ValueError("Invalid number of input arguments: " + single_input) + uri, input_file = args + + with open(input_file, "rb") as f: + data = f.read() + + cache.add_cache_slot(uri, data) + +class CacheFromEnvelope: + def fill_cache_from_envelope_data(cache: CachePartition, + envelope_data: bytes, + omit_payload_regex: str, + dependency_regex: str) -> bytes: + """ + Fill the cache partition with data from the payloads inside the provided envelope binary data. + This function is called recursively for dependency envelopes. - cache_data += slot_data + :param cache: CachePartition object to fill with the data + :param envelope_data: Binary data of the envelope to extract the payloads from + :param omit_payload_regex: Integrated payloads matching the regular expression will not be extracted to the cache + :param dependency_regex: Integrated payloads matching the regular expression will be treated as dependency envelopes + """ + envelope = cbor2.loads(envelope_data) - cache_data += bytes([0xFF]) # Finish the indefinite length map + if isinstance(envelope.value, dict): + integrated = [k for k in envelope.value.keys() if isinstance(k, str)] + else: + raise GeneratorError("The provided envelope/dependency envelope is not a valid envelope!") + + if not dependency_regex is None: + integrated_dependencies = [k for k in integrated if not re.fullmatch(dependency_regex, k) is None] + for dep in integrated_dependencies: + integrated.remove(dep) + + if omit_payload_regex is None: + payloads_to_extract = integrated + else: + payloads_to_extract = [k for k in integrated if re.fullmatch(omit_payload_regex, k) is None] + + for payload in payloads_to_extract: + cache.add_cache_slot(payload, envelope.value.pop(payload)) + + for dependency in integrated_dependencies: + new_dependency_data = CacheFromEnvelope.fill_cache_from_envelope_data( + cache, envelope.value[dependency], omit_payload_regex, dependency_regex) + envelope.value[dependency] = new_dependency_data + + return cbor2.dumps(envelope) + + def fill_cache_from_envelope(cache: CachePartition, + input_envelope: str, + output_envelope: str, + omit_payload_regex: str, + dependency_regex: str) -> None: + """ + Fill the cache partition with data from the payloads inside the provided envelope file. + param cache: CachePartition object to fill with the data + param input_envelope: Path to the input envelope file + param output_envelope: Path to the output envelope file (envelope with removed extracted payloads) + param omit_payload_regex: Integrated payloads matching the regular expression will not be extracted to the cache + param dependency_regex: Integrated payloads matching the regular expression will be treated as dependency envelopes + """ + with open(input_envelope, "rb") as fh: + data = fh.read() + output_envelope_data = CacheFromEnvelope.fill_cache_from_envelope_data(cache, + data, omit_payload_regex, dependency_regex) + with open(output_envelope, "wb") as fh: + fh.write(output_envelope_data) + +def main(**kwargs) -> None: + """ + Create a raw SUIT DFU cache file. + """ + + cache = CachePartition(kwargs["eb_size"]) + + if kwargs["cache_create_subcommand"] == CACHE_CREATE_FROM_PAYLOADS_CMD: + CacheFromPayloads.fill_cache_from_payloads(cache, kwargs["input"]) + elif kwargs["cache_create_subcommand"] == CACHE_CREATE_FROM_ENVELOPE_CMD: + CacheFromEnvelope.fill_cache_from_envelope(cache, + kwargs["input_envelope"], + kwargs["output_envelope"], + kwargs["omit_payload_regex"], + kwargs["dependency_regex"]) + else: + raise GeneratorError(f"Invalid 'cache_create' subcommand: {kwargs['cache_create_subcommand']}") - with open(output_file, "wb") as f: - f.write(cache_data) + cache.close_and_save_cache(kwargs["output_file"])