From 926fbf896c0ba1a48e3419022101cd22dbc4caa0 Mon Sep 17 00:00:00 2001 From: kim pham Date: Thu, 17 Oct 2024 21:38:13 +0100 Subject: [PATCH] linter for box reader after logging --- src/jp2_remediator/box_reader.py | 121 +++++++++++++++++++++++-------- 1 file changed, 91 insertions(+), 30 deletions(-) diff --git a/src/jp2_remediator/box_reader.py b/src/jp2_remediator/box_reader.py index 1a827ab..67a3efd 100644 --- a/src/jp2_remediator/box_reader.py +++ b/src/jp2_remediator/box_reader.py @@ -4,7 +4,7 @@ import boto3 import datetime from jp2_remediator import configure_logger -from jpylyzer import jpylyzer +# from jpylyzer import jpylyzer # from jpylyzer import jpylyzer from jpylyzer import boxvalidator @@ -51,7 +51,9 @@ def check_boxes(self): b"\x6a\x70\x32\x68" ) # search hex for 'jp2h' if jp2h_position != -1: - self.logger.debug(f"'jp2h' found at byte position: {jp2h_position}") + self.logger.debug( + f"'jp2h' found at byte position: {jp2h_position}" + ) else: self.logger.debug("'jp2h' not found in the file.") @@ -59,7 +61,9 @@ def check_boxes(self): b"\x63\x6f\x6c\x72" ) # search hex for 'colr' if colr_position != -1: - self.logger.debug(f"'colr' found at byte position: {colr_position}") + self.logger.debug( + f"'colr' found at byte position: {colr_position}" + ) else: self.logger.debug("'colr' not found in the file.") @@ -70,19 +74,41 @@ def check_boxes(self): def process_colr_box(self, colr_position): """Processes the 'colr' box to determine header offset position.""" if colr_position != -1: - self.logger.debug(f"'colr' found at byte position: {colr_position}") - meth_byte_position = colr_position + 4 # ISO/IEC 15444-1:2019(E) Figure I.10 colr specification box, byte position of METH value after 'colr' + self.logger.debug( + f"'colr' found at byte position: {colr_position}" + ) + meth_byte_position = colr_position + 4 + """ISO/IEC 15444-1:2019(E) Figure I.10 colr specification box + byte position of METH value after 'colr' """ meth_value = self.file_contents[meth_byte_position] - self.logger.debug(f"'meth' value: {meth_value} at byte position: {meth_byte_position}") + self.logger.debug(f"""'meth' value: { + meth_value + } at byte position: { + meth_byte_position + }""") if meth_value == 1: - header_offset_position = meth_byte_position + 7 # ISO/IEC 15444-1:2019(E) Table I.11 colr specification box, if meth is 1 then color profile starts at byte position 7 after 'colr' - self.logger.debug(f"'meth' is 1, setting header_offset_position to: {header_offset_position}") + header_offset_position = meth_byte_position + 7 + """ISO/IEC 15444-1:2019(E) Table I.11 colr specification box, + if meth is 1 then color profile starts at + byte position 7 after 'colr'""" + self.logger.debug( + f"""'meth' is 1, setting header_offset_position to: { + header_offset_position + }""") elif meth_value == 2: - header_offset_position = meth_byte_position + 3 # ISO/IEC 15444-1:2019(E) Table I.11 colr specification box, if meth is 2 then color profile (ICC profile) starts at byte position 3 after 'colr' - self.logger.debug(f"'meth' is 2, setting header_offset_position to: {header_offset_position} (start of ICC profile)") + header_offset_position = meth_byte_position + 3 + """ ISO/IEC 15444-1:2019(E) Table I.11 colr specification box, + if meth is 2 then color profile (ICC profile) starts + at byte position 3 after 'colr'""" + self.logger.debug(f"""'meth' is 2, setting + header_offset_position to: { + header_offset_position + } (start of ICC profile)""") else: - self.logger.debug(f"'meth' value {meth_value} is not recognized (must be 1 or 2).") + self.logger.debug(f"""'meth' value { + meth_value + } is not recognized (must be 1 or 2).""") header_offset_position = None else: self.logger.debug("'colr' not found in the file.") @@ -101,22 +127,32 @@ def process_trc_tag(self, self.logger.debug(f"'{trc_name}' not found in the file.") return new_contents - self.logger.debug(f"'{trc_name}' found at byte position: {trc_position}") - trc_tag_entry = new_contents[trc_position:trc_position + 12] # 12-byte tag entry length + self.logger.debug(f"""'{trc_name}' found at byte position: { + trc_position + }""") + trc_tag_entry = new_contents[trc_position:trc_position + 12] + # 12-byte tag entry length if len(trc_tag_entry) != 12: - self.logger.debug(f"Could not extract the full 12-byte '{trc_name}' tag entry.") + self.logger.debug(f"""Could not extract the full 12-byte '{ + trc_name + }' tag entry.""") return new_contents - trc_tag_signature = trc_tag_entry[0:4] # ICC.1:2022 Table 24 tag signature, e.g. 'rTRC' - trc_tag_offset = int.from_bytes(trc_tag_entry[4:8], byteorder='big') # ICC.1:2022 Table 24 tag offset - trc_tag_size = int.from_bytes(trc_tag_entry[8:12], byteorder='big') # ICC.1:2022 Table 24 tag size + trc_tag_signature = trc_tag_entry[0:4] + # ICC.1:2022 Table 24 tag signature, e.g. 'rTRC' + trc_tag_offset = int.from_bytes(trc_tag_entry[4:8], byteorder='big') + # ICC.1:2022 Table 24 tag offset + trc_tag_size = int.from_bytes(trc_tag_entry[8:12], byteorder='big') + # ICC.1:2022 Table 24 tag size self.logger.debug(f"'{trc_name}' Tag Signature: {trc_tag_signature}") self.logger.debug(f"'{trc_name}' Tag Offset: {trc_tag_offset}") self.logger.debug(f"'{trc_name}' Tag Size: {trc_tag_size}") if header_offset_position is None: - self.logger.debug(f"Cannot calculate 'curv_{trc_name}_position' due to an unrecognized 'meth' value.") + self.logger.debug(f"""Cannot calculate 'curv_{ + trc_name + }_position' due to an unrecognized 'meth' value.""") return new_contents curv_trc_position = ( @@ -127,7 +163,10 @@ def process_trc_tag(self, ] # 12-byte curv profile data length if len(curv_profile) < 12: - self.logger.debug(f"Could not read the full 'curv' profile data for {trc_name}.") + self.logger.debug( + f"""Could not read the full 'curv' profile data for { + trc_name + }.""") return new_contents curv_signature = curv_profile[0:4].decode( @@ -140,25 +179,45 @@ def process_trc_tag(self, curv_profile[8:12], byteorder="big" ) # # ICC.1:2022 Table 35 n value - self.logger.debug(f"'curv' Profile Signature for {trc_name}: {curv_signature}") + self.logger.debug(f"""'curv' Profile Signature for {trc_name}: { + curv_signature + }""") self.logger.debug(f"'curv' Reserved Value: {curv_reserved}") - self.logger.debug(f"'curv_{trc_name}_gamma_n' Value: {curv_trc_gamma_n}") - curv_trc_field_length = curv_trc_gamma_n * 2 + 12 # ICC.1:2022 Table 35 2n field length - self.logger.debug(f"'curv_{trc_name}_field_length': {curv_trc_field_length}") + self.logger.debug( + f"'curv_{trc_name}_gamma_n' Value: {curv_trc_gamma_n}" + ) + curv_trc_field_length = curv_trc_gamma_n * 2 + 12 + # ICC.1:2022 Table 35 2n field length + self.logger.debug(f"""'curv_{trc_name}_field_length': { + curv_trc_field_length + }""") """Check if curv_trc_gamma_n is not 1 and ask for confirmation to proceed, loops through all TRC tags""" if curv_trc_gamma_n != 1: - self.logger.warning(f"Warning: 'curv_{trc_name}_gamma_n' value is {curv_trc_gamma_n}, expected 1.") - proceed = input(f"Do you want to proceed with fixing the file {self.file_path}? (y/n): ").lower() + self.logger.warning(f"""Warning: 'curv_{ + trc_name + }_gamma_n' value is { + curv_trc_gamma_n + }, expected 1.""") + proceed = input(f"""Do you want to proceed with fixing the file { + self.file_path} (y/n)?: """).lower() if proceed != 'y': print(f"Skipping fixing for {self.file_path}") return new_contents if trc_tag_size != curv_trc_field_length: - self.logger.warning(f"'{trc_name}' Tag Size ({trc_tag_size}) does not match 'curv_{trc_name}_field_length' ({curv_trc_field_length}). Modifying the size...") - new_trc_size_bytes = curv_trc_field_length.to_bytes(4, byteorder='big') - new_contents[trc_position + 8: trc_position + 12] = new_trc_size_bytes + self.logger.warning(f"""'{trc_name}' Tag Size ({ + trc_tag_size + }) does not match 'curv_{trc_name}_field_length' ({ + curv_trc_field_length + }). Modifying the size...""") + new_trc_size_bytes = curv_trc_field_length.to_bytes( + 4, + byteorder='big') + new_contents[ + trc_position + 8: trc_position + 12 + ] = new_trc_size_bytes return new_contents def process_all_trc_tags(self, header_offset_position): @@ -187,9 +246,11 @@ def write_modified_file(self, new_file_contents): ".jp2", f"_modified_{timestamp}.jp2") with open(new_file_path, "wb") as new_file: new_file.write(new_file_contents) - self.logger.info(f"New JP2 file created with modifications: {new_file_path}") + self.logger.info(f"""New JP2 file created with modifications: { + new_file_path + }""") else: - self.logger.debug("No modifications were needed. No new file was created.") + self.logger.debug("No modifications needed. No new file created.") def read_jp2_file(self): """Main function to read, validate, and modify JP2 files."""