Skip to content

Commit

Permalink
Replace print statements with logging
Browse files Browse the repository at this point in the history
- defaults to only console logging
- defaults to WARNING level
   - configurable with: `export APP_LOG_LEVEL=[INFO|DEBUG]`
  • Loading branch information
awoods committed Oct 14, 2024
1 parent af8cd2d commit f932cce
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 29 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
*~
*.swp
logs/

dist/*
*/*.egg-info/*
__pycache__
.DS_Store
.DS_Store
42 changes: 42 additions & 0 deletions src/jp2_remediator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
from logging.handlers import TimedRotatingFileHandler
import os
import socket
from datetime import datetime

LOG_FILE_BACKUP_COUNT = int(os.getenv('LOG_FILE_BACKUP_COUNT', '30'))
LOG_ROTATION = "midnight"

timestamp = datetime.today().strftime('%Y-%m-%d')


def configure_logger(name): # pragma: no cover
log_level = os.getenv("APP_LOG_LEVEL", "WARNING")
log_dir = os.getenv("LOG_DIR", "logs/")
# create log directory if it doesn't exist
if not os.path.exists(log_dir):
os.makedirs(log_dir)

log_file_path = os.path.join(log_dir, "jp2_remediator.log")
formatter = logging.Formatter(
'%(levelname)s - %(asctime)s - %(name)s - %(message)s')

console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)

logger = logging.getLogger(name)
logger.addHandler(console_handler)
# Defaults to console logging
if os.getenv("CONSOLE_LOGGING_ONLY", "true") == "false":
# make log_file_path if it doesn't exist
# os.makedirs(log_file_path, exist_ok=True)
file_handler = TimedRotatingFileHandler(
filename=log_file_path,
when=LOG_ROTATION,
backupCount=LOG_FILE_BACKUP_COUNT
)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)

logger.setLevel(log_level)
return logger
58 changes: 30 additions & 28 deletions src/jp2_remediator/box_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import argparse
import boto3
import datetime
from jp2_remediator import configure_logger
from jpylyzer import jpylyzer
from jpylyzer import boxvalidator
from jpylyzer import byteconv
Expand All @@ -13,14 +14,15 @@ def __init__(self, file_path):
self.file_path = file_path
self.file_contents = self.read_file(file_path)
self.validator = None
self.logger = configure_logger(__name__)

def read_file(self, file_path):
"""Reads the file content from the given path."""
try:
with open(file_path, 'rb') as file:
return file.read()
except IOError as e:
print(f"Error reading file {file_path}: {e}")
self.logger.error(f"Error reading file {file_path}: {e}")
return None

def initialize_validator(self):
Expand All @@ -38,15 +40,15 @@ def check_boxes(self):
"""Checks for presence of 'jp2h' and 'colr' boxes in the file contents."""
jp2h_position = self.find_box_position(b'\x6a\x70\x32\x68') # search hex for 'jp2h'
if jp2h_position != -1:
print(f"'jp2h' found at byte position: {jp2h_position}")
self.logger.debug(f"'jp2h' found at byte position: {jp2h_position}")
else:
print("'jp2h' not found in the file.")
self.logger.debug("'jp2h' not found in the file.")

colr_position = self.find_box_position(b'\x63\x6f\x6c\x72') # search hex for 'colr'
if colr_position != -1:
print(f"'colr' found at byte position: {colr_position}")
self.logger.debug(f"'colr' found at byte position: {colr_position}")
else:
print("'colr' not found in the file.")
self.logger.debug("'colr' not found in the file.")

header_offset_position = self.process_colr_box(colr_position)

Expand All @@ -55,22 +57,22 @@ def check_boxes(self):
def process_colr_box(self, colr_position):
"""Processes the 'colr' box to determine header offset position."""
if colr_position != -1:
print(f"'colr' found at byte position: {colr_position}")
self.logger.debug(f"'colr' found at byte position: {colr_position}")
meth_byte_position = colr_position + 4 # ISO/IEC 15444-1:2019(E) Figure I.10 colr specification box, byte position of METH value after 'colr'
meth_value = self.file_contents[meth_byte_position]
print(f"'meth' value: {meth_value} at byte position: {meth_byte_position}")
self.logger.debug(f"'meth' value: {meth_value} at byte position: {meth_byte_position}")

if meth_value == 1:
header_offset_position = meth_byte_position + 7 # ISO/IEC 15444-1:2019(E) Table I.11 colr specification box, if meth is 1 then color profile starts at byte position 7 after 'colr'
print(f"'meth' is 1, setting header_offset_position to: {header_offset_position}")
self.logger.debug(f"'meth' is 1, setting header_offset_position to: {header_offset_position}")
elif meth_value == 2:
header_offset_position = meth_byte_position + 3 # ISO/IEC 15444-1:2019(E) Table I.11 colr specification box, if meth is 2 then color profile (ICC profile) starts at byte position 3 after 'colr'
print(f"'meth' is 2, setting header_offset_position to: {header_offset_position} (start of ICC profile)")
self.logger.debug(f"'meth' is 2, setting header_offset_position to: {header_offset_position} (start of ICC profile)")
else:
print(f"'meth' value {meth_value} is not recognized (must be 1 or 2).")
self.logger.debug(f"'meth' value {meth_value} is not recognized (must be 1 or 2).")
header_offset_position = None
else:
print("'colr' not found in the file.")
self.logger.debug("'colr' not found in the file.")
header_offset_position = None

return header_offset_position
Expand All @@ -79,55 +81,55 @@ def process_trc_tag(self, trc_hex, trc_name, new_contents, header_offset_positio
"""Processes the TRC tag and modifies contents if necessary."""
trc_position = new_contents.find(trc_hex)
if trc_position == -1:
print(f"'{trc_name}' not found in the file.")
self.logger.debug(f"'{trc_name}' not found in the file.")
return new_contents

print(f"'{trc_name}' found at byte position: {trc_position}")
self.logger.debug(f"'{trc_name}' found at byte position: {trc_position}")
trc_tag_entry = new_contents[trc_position:trc_position + 12] # 12-byte tag entry length

if len(trc_tag_entry) != 12:
print(f"Could not extract the full 12-byte '{trc_name}' tag entry.")
self.logger.debug(f"Could not extract the full 12-byte '{trc_name}' tag entry.")
return new_contents

trc_tag_signature = trc_tag_entry[0:4] # ICC.1:2022 Table 24 tag signature, e.g. 'rTRC'
trc_tag_offset = int.from_bytes(trc_tag_entry[4:8], byteorder='big') # ICC.1:2022 Table 24 tag offset
trc_tag_size = int.from_bytes(trc_tag_entry[8:12], byteorder='big') # ICC.1:2022 Table 24 tag size
print(f"'{trc_name}' Tag Signature: {trc_tag_signature}")
print(f"'{trc_name}' Tag Offset: {trc_tag_offset}")
print(f"'{trc_name}' Tag Size: {trc_tag_size}")
self.logger.debug(f"'{trc_name}' Tag Signature: {trc_tag_signature}")
self.logger.debug(f"'{trc_name}' Tag Offset: {trc_tag_offset}")
self.logger.debug(f"'{trc_name}' Tag Size: {trc_tag_size}")

if header_offset_position is None:
print(f"Cannot calculate 'curv_{trc_name}_position' due to an unrecognized 'meth' value.")
self.logger.debug(f"Cannot calculate 'curv_{trc_name}_position' due to an unrecognized 'meth' value.")
return new_contents

curv_trc_position = trc_tag_offset + header_offset_position # start of curv profile data
curv_profile = new_contents[curv_trc_position:curv_trc_position + 12] # 12-byte curv profile data length

if len(curv_profile) < 12:
print(f"Could not read the full 'curv' profile data for {trc_name}.")
self.logger.debug(f"Could not read the full 'curv' profile data for {trc_name}.")
return new_contents

curv_signature = curv_profile[0:4].decode('utf-8') # ICC.1:2022 Table 35 tag signature
curv_reserved = int.from_bytes(curv_profile[4:8], byteorder='big') # ICC.1:2022 Table 35 reserved 0's
curv_trc_gamma_n = int.from_bytes(curv_profile[8:12], byteorder='big') # # ICC.1:2022 Table 35 n value

print(f"'curv' Profile Signature for {trc_name}: {curv_signature}")
print(f"'curv' Reserved Value: {curv_reserved}")
print(f"'curv_{trc_name}_gamma_n' Value: {curv_trc_gamma_n}")
self.logger.debug(f"'curv' Profile Signature for {trc_name}: {curv_signature}")
self.logger.debug(f"'curv' Reserved Value: {curv_reserved}")
self.logger.debug(f"'curv_{trc_name}_gamma_n' Value: {curv_trc_gamma_n}")

curv_trc_field_length = curv_trc_gamma_n * 2 + 12 # ICC.1:2022 Table 35 2n field length
print(f"'curv_{trc_name}_field_length': {curv_trc_field_length}")
self.logger.debug(f"'curv_{trc_name}_field_length': {curv_trc_field_length}")

# Check if curv_trc_gamma_n is not 1 and ask for confirmation to proceed, loops through all TRC tags
if curv_trc_gamma_n != 1:
print(f"Warning: 'curv_{trc_name}_gamma_n' value is {curv_trc_gamma_n}, expected 1.")
self.logger.warning(f"Warning: 'curv_{trc_name}_gamma_n' value is {curv_trc_gamma_n}, expected 1.")
proceed = input(f"Do you want to proceed with fixing the file {self.file_path}? (y/n): ").lower()
if proceed != 'y':
print(f"Skipping fixing for {self.file_path}")
return new_contents

if trc_tag_size != curv_trc_field_length:
print(f"'{trc_name}' Tag Size ({trc_tag_size}) does not match 'curv_{trc_name}_field_length' ({curv_trc_field_length}). Modifying the size...")
self.logger.warning(f"'{trc_name}' Tag Size ({trc_tag_size}) does not match 'curv_{trc_name}_field_length' ({curv_trc_field_length}). Modifying the size...")
new_trc_size_bytes = curv_trc_field_length.to_bytes(4, byteorder='big')
new_contents[trc_position + 8: trc_position + 12] = new_trc_size_bytes

Expand All @@ -154,9 +156,9 @@ def write_modified_file(self, new_file_contents):
new_file_path = self.file_path.replace(".jp2", f"_modified_{timestamp}.jp2")
with open(new_file_path, 'wb') as new_file:
new_file.write(new_file_contents)
print(f"New JP2 file created with modifications: {new_file_path}")
self.logger.info(f"New JP2 file created with modifications: {new_file_path}")
else:
print("No modifications were needed. No new file was created.")
self.logger.debug("No modifications were needed. No new file was created.")

def read_jp2_file(self):
"""Main function to read, validate, and modify JP2 files."""
Expand All @@ -165,7 +167,7 @@ def read_jp2_file(self):

self.initialize_validator()
is_valid = self.validator._isValid()
print("Is file valid?", is_valid)
self.logger.info("Is file valid?", is_valid)

header_offset_position = self.check_boxes()
new_file_contents = self.process_all_trc_tags(header_offset_position)
Expand Down

0 comments on commit f932cce

Please sign in to comment.