From 4a2ccba9ff4701f4496a29e25b9d3e43d54e908f Mon Sep 17 00:00:00 2001 From: Andrew Woods Date: Fri, 1 Nov 2024 14:44:24 +0100 Subject: [PATCH] Replace print statements with logging (#4) * Replace print statements with logging - defaults to only console logging - defaults to WARNING level - configurable with: `export APP_LOG_LEVEL=[INFO|DEBUG]` * Merge awoods-logging into logging (#3) * linter for box reader after logging * comment out diff cover remove socket * skip processing of files where n value is not 1 * tests with 65% cvg * add in github actions install of diff cover globally * update paths for diff cover * test diff cover install and check path * uncomment diff cover test * uncomment diff cover test 2 * new coverage test * full test coverage * flake8 passes all files * check flake8 version * mod flake8 command for 120 line length * add additional test coverage, stops early if not 85 --------- Co-authored-by: kim pham --- .github/workflows/test.yml | 24 +- .gitignore | 4 + README.md | 16 +- pyproject.toml | 2 +- requirements.txt | 5 + src/jp2_remediator/__init__.py | 41 ++ src/jp2_remediator/box_reader.py | 289 ++++--------- src/jp2_remediator/main.py | 52 ++- .../tests/unit/test_box_reader.py | 380 +++++++++++++++++- 9 files changed, 559 insertions(+), 254 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9a8fa29..8d88441 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -32,8 +32,9 @@ jobs: - name: Run flake8 run: | pip install flake8 + flake8 --version # stop the build if there are flake8 errors - flake8 . --count --show-source --statistics + flake8 . --count --show-source --statistics --max-line-length 120 - name: Run unit tests run: | @@ -45,25 +46,26 @@ jobs: pip install coverage python -m coverage run -p -m pytest src/jp2_remediator/tests/unit python -m coverage combine - python -m coverage report -m --skip-covered + python -m coverage report -m --skip-covered --fail-under=85 python -m coverage xml # Fetch base branch for comparison (e.g., main) - name: Fetch base branch run: git fetch origin main - # Compare coverage with the base branch + - name: Install diff-cover + run: | + pip install --user diff-cover + find $HOME -name "diff-cover" || echo "diff-cover not found" + + - name: Add diff-cover to PATH + run: echo "$HOME/.local/bin" >> $GITHUB_PATH + + # Compare coverage with the base branch, if decreases fails, if under 85 percent fails - name: Compare coverage run: | - pip install diff-cover git checkout main python -m coverage run -p -m pytest src/jp2_remediator/tests/unit python -m coverage xml -o coverage-base.xml git checkout - - python diff-cover --compare-branch=main coverage.xml - - # Fail if coverage decreases - - name: Fail if coverage decreases - run: | - python diff-cover --compare-branch=main coverage.xml --fail-under=100 - + diff-cover --compare-branch=main coverage.xml --fail-under=85 \ No newline at end of file diff --git a/.gitignore b/.gitignore index 7dc3f5b..20ebd7e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,14 @@ *~ *.swp + +logs/ + input/* output/* .coverage coverage.* + dist/* */*.egg-info/* __pycache__ diff --git a/README.md b/README.md index 189a5d4..70afbee 100644 --- a/README.md +++ b/README.md @@ -16,29 +16,29 @@ pip install jp2_remediator==0.0.2 ## Usage ## Process one file -`python3 box_reader.py --file tests/test-images/7514499.jp2` +`python3 main.py --file tests/test-images/7514499.jp2` -`python3 box_reader.py --file tests/test-images/481014278.jp2` +`python3 main.py --file tests/test-images/481014278.jp2` ## Process directory -`python3 box_reader.py --directory tests/test-images/` +`python3 main.py --directory tests/test-images/` ## Process Amazon S3 bucket -`python3 box_reader.py --bucket your-bucket-name --prefix optional-prefix` +`python3 main.py --bucket your-bucket-name --prefix optional-prefix` ## Process all .jp2 files in the bucket: -`python3 box_reader.py --bucket remediation-folder` +`python3 main.py --bucket remediation-folder` ## Process only files with a specific prefix (folder): -`python3 box_reader.py --bucket remediation-folder --prefix testbatch_20240923` +`python3 main.py --bucket remediation-folder --prefix testbatch_20240923` -`python3 box_reader.py --help` +`python3 main.py --help` ## Run Tests `python3 test_aws_connection.py` ### Run from src folder -`python3 -m unittest jp2_remediator.tests.test_box_reader` +`python3 -m unittest jp2_remediator.tests.unit.test_box_reader` ## Docker environment diff --git a/pyproject.toml b/pyproject.toml index e3ee6b3..4ee318d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,4 +34,4 @@ omit = [ [tool.project-paths] dir_unit_out = "src/jp2_remediator/tests/out/" -dir_unit_resources = "src/jp2_remediator/tests/resources/" +dir_unit_resources = "src/jp2_remediator/tests/resources/" \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 0402430..b137350 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,15 @@ boto3==1.35.39 botocore==1.35.39 +flake8==7.1.1 jmespath==1.0.1 jpylyzer==2.2.1 +mccabe==0.7.0 project-paths==1.1.1 +pycodestyle==2.12.1 +pyflakes==3.2.0 python-dateutil==2.9.0.post0 s3transfer==0.10.3 +setuptools==73.0.1 six==1.16.0 toml==0.10.2 urllib3==2.2.3 diff --git a/src/jp2_remediator/__init__.py b/src/jp2_remediator/__init__.py index e69de29..ce40ebb 100644 --- a/src/jp2_remediator/__init__.py +++ b/src/jp2_remediator/__init__.py @@ -0,0 +1,41 @@ +import logging +from logging.handlers import TimedRotatingFileHandler +import os +from datetime import datetime + +LOG_FILE_BACKUP_COUNT = int(os.getenv('LOG_FILE_BACKUP_COUNT', '30')) +LOG_ROTATION = "midnight" + +timestamp = datetime.today().strftime('%Y-%m-%d') + + +def configure_logger(name): # pragma: no cover + log_level = os.getenv("APP_LOG_LEVEL", "WARNING") + log_dir = os.getenv("LOG_DIR", "logs/") + # create log directory if it doesn't exist + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + log_file_path = os.path.join(log_dir, "jp2_remediator.log") + formatter = logging.Formatter( + '%(levelname)s - %(asctime)s - %(name)s - %(message)s') + + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + + logger = logging.getLogger(name) + logger.addHandler(console_handler) + # Defaults to console logging + if os.getenv("CONSOLE_LOGGING_ONLY", "true") == "false": + # make log_file_path if it doesn't exist + # os.makedirs(log_file_path, exist_ok=True) + file_handler = TimedRotatingFileHandler( + filename=log_file_path, + when=LOG_ROTATION, + backupCount=LOG_FILE_BACKUP_COUNT + ) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + logger.setLevel(log_level) + return logger diff --git a/src/jp2_remediator/box_reader.py b/src/jp2_remediator/box_reader.py index 63f252e..602e26b 100644 --- a/src/jp2_remediator/box_reader.py +++ b/src/jp2_remediator/box_reader.py @@ -1,235 +1,153 @@ -# import sys import os -import argparse import boto3 import datetime - -# from jpylyzer import jpylyzer +from jp2_remediator import configure_logger from jpylyzer import boxvalidator -# from jpylyzer import byteconv - class BoxReader: def __init__(self, file_path): - """Initializes BoxReader with a file path.""" + # Initializes BoxReader with a file path. self.file_path = file_path self.file_contents = self.read_file(file_path) self.validator = None + self.logger = configure_logger(__name__) def read_file(self, file_path): - """Reads the file content from the given path.""" + # Reads the file content from the given path. try: with open(file_path, "rb") as file: return file.read() except IOError as e: - print(f"Error reading file {file_path}: {e}") + self.logger.error(f"Error reading file {file_path}: {e}") return None def initialize_validator(self): - """Initializes the jpylyzer BoxValidator for JP2 file validation.""" + # Initializes the jpylyzer BoxValidator for JP2 file validation. options = { "validationFormat": "jp2", "verboseFlag": True, "nullxmlFlag": False, "packetmarkersFlag": False, } - self.validator = boxvalidator.BoxValidator( - options, "JP2", self.file_contents) + self.validator = boxvalidator.BoxValidator(options, "JP2", self.file_contents) self.validator.validate() return self.validator def find_box_position(self, box_hex): - """Finds the position of the specified box in the file.""" + # Finds the position of the specified box in the file. return self.file_contents.find(box_hex) def check_boxes(self): - """Checks for presence of 'jp2h' and 'colr' boxes in file contents.""" - jp2h_position = self.find_box_position( - b"\x6a\x70\x32\x68" - ) # search hex for 'jp2h' + # Checks for presence of 'jp2h' and 'colr' boxes in file contents. + jp2h_position = self.find_box_position(b"\x6a\x70\x32\x68") # search hex for 'jp2h' if jp2h_position != -1: - print(f"'jp2h' found at byte position: {jp2h_position}") + self.logger.debug(f"'jp2h' found at byte position: {jp2h_position}") else: - print("'jp2h' not found in the file.") + self.logger.debug("'jp2h' not found in the file.") - colr_position = self.find_box_position( - b"\x63\x6f\x6c\x72" - ) # search hex for 'colr' + colr_position = self.find_box_position(b"\x63\x6f\x6c\x72") # search hex for 'colr' if colr_position != -1: - print(f"'colr' found at byte position: {colr_position}") + self.logger.debug(f"'colr' found at byte position: {colr_position}") else: - print("'colr' not found in the file.") + self.logger.debug("'colr' not found in the file.") header_offset_position = self.process_colr_box(colr_position) return header_offset_position def process_colr_box(self, colr_position): - """Processes the 'colr' box to determine header offset position.""" + # Processes the 'colr' box to determine header offset position. if colr_position != -1: - print(f"'colr' found at byte position: {colr_position}") - meth_byte_position = ( - colr_position + 4 - ) - """ ISO/IEC 15444-1:2019(E) Figure I.10 colr specification box - byte position of METH value after 'colr' """ + self.logger.debug(f"'colr' found at byte position: {colr_position}") + meth_byte_position = colr_position + 4 + # ISO/IEC 15444-1:2019(E) Figure I.10 colr specification box + # byte position of METH value after 'colr' meth_value = self.file_contents[meth_byte_position] - print( - f"""'meth' value: {meth_value} at byte position: { - meth_byte_position - }""" - ) + self.logger.debug(f"'meth' value: {meth_value} at byte position: {meth_byte_position}") + if meth_value == 1: - header_offset_position = ( - meth_byte_position + 7 - ) - """ ISO/IEC 15444-1:2019(E) Table I.11 colr specification box - if meth is 1 then color profile starts - at byte position 7 after 'colr' """ - print( - f"""'meth' is 1, setting header_offset_position to: { - header_offset_position - }""" - ) + header_offset_position = meth_byte_position + 7 + # ISO/IEC 15444-1:2019(E) Table I.11 colr specification box, + # if meth is 1 then color profile starts at byte position 7 after 'colr' + self.logger.debug(f"'meth' is 1, setting header_offset_position to: {header_offset_position}") elif meth_value == 2: - header_offset_position = ( - meth_byte_position + 3 - ) - """ ISO/IEC 15444-1:2019(E) Table I.11 colr specification box - if meth is 2 then color profile (ICC profile) starts - at byte position 3 after 'colr' """ - - print( - f"""'meth' is 2, setting header_offset_position to: { - header_offset_position - } (start of ICC profile)""" - ) + header_offset_position = meth_byte_position + 3 + # ISO/IEC 15444-1:2019(E) Table I.11 colr specification box, + # if meth is 2 then color profile (ICC profile) starts at byte position 3 after 'colr' + self.logger.debug(f"""'meth' is 2, setting header_offset_position to: { + header_offset_position} (start of ICC profile)""") else: - print( - f"""'meth' value { - meth_value - } is not recognized (must be 1 or 2).""" - ) + self.logger.debug(f"'meth' value {meth_value} is not recognized (must be 1 or 2).") header_offset_position = None else: - print("'colr' not found in the file.") + self.logger.debug("'colr' not found in the file.") header_offset_position = None return header_offset_position - def process_trc_tag(self, - trc_hex, - trc_name, - new_contents, - header_offset_position): - """Processes the TRC tag and modifies contents if necessary.""" + def process_trc_tag(self, trc_hex, trc_name, new_contents, header_offset_position): + # Processes the TRC tag and modifies contents if necessary. trc_position = new_contents.find(trc_hex) if trc_position == -1: - print(f"'{trc_name}' not found in the file.") + self.logger.debug(f"'{trc_name}' not found in the file.") return new_contents - print(f"'{trc_name}' found at byte position: {trc_position}") - trc_tag_entry = new_contents[trc_position: trc_position + 12] + self.logger.debug(f"'{trc_name}' found at byte position: {trc_position}") + trc_tag_entry = new_contents[trc_position:trc_position + 12] # 12-byte tag entry length if len(trc_tag_entry) != 12: - print( - f"Could not extract the full 12-byte '{trc_name}' tag entry." - ) + self.logger.debug(f"Could not extract the full 12-byte '{trc_name}' tag entry.") return new_contents - trc_tag_signature = trc_tag_entry[ - 0:4 - ] # ICC.1:2022 Table 24 tag signature, e.g. 'rTRC' - trc_tag_offset = int.from_bytes( - trc_tag_entry[4:8], byteorder="big" - ) # ICC.1:2022 Table 24 tag offset - trc_tag_size = int.from_bytes( - trc_tag_entry[8:12], byteorder="big" - ) # ICC.1:2022 Table 24 tag size - print(f"'{trc_name}' Tag Signature: {trc_tag_signature}") - print(f"'{trc_name}' Tag Offset: {trc_tag_offset}") - print(f"'{trc_name}' Tag Size: {trc_tag_size}") + trc_tag_signature = trc_tag_entry[0:4] + # ICC.1:2022 Table 24 tag signature, e.g. 'rTRC' + trc_tag_offset = int.from_bytes(trc_tag_entry[4:8], byteorder='big') + # ICC.1:2022 Table 24 tag offset + trc_tag_size = int.from_bytes(trc_tag_entry[8:12], byteorder='big') + # ICC.1:2022 Table 24 tag size + self.logger.debug(f"'{trc_name}' Tag Signature: {trc_tag_signature}") + self.logger.debug(f"'{trc_name}' Tag Offset: {trc_tag_offset}") + self.logger.debug(f"'{trc_name}' Tag Size: {trc_tag_size}") if header_offset_position is None: - print( - f"""Cannot calculate 'curv_{ - trc_name - }_position' due to an unrecognized 'meth' value.""" - ) + self.logger.debug(f"Cannot calculate 'curv_{trc_name}_position' due to an unrecognized 'meth' value.") return new_contents - curv_trc_position = ( - trc_tag_offset + header_offset_position - ) # start of curv profile data - curv_profile = new_contents[ - curv_trc_position: curv_trc_position + 12 - ] # 12-byte curv profile data length + curv_trc_position = trc_tag_offset + header_offset_position # start of curv profile data + curv_profile = new_contents[curv_trc_position: curv_trc_position + 12] # 12-byte curv profile data length if len(curv_profile) < 12: - print( - f"Could not read the full 'curv' profile data for {trc_name}." - ) + self.logger.debug(f"Could not read the full 'curv' profile data for {trc_name}.") return new_contents - curv_signature = curv_profile[0:4].decode( - "utf-8" - ) # ICC.1:2022 Table 35 tag signature - curv_reserved = int.from_bytes( - curv_profile[4:8], byteorder="big" - ) # ICC.1:2022 Table 35 reserved 0's - curv_trc_gamma_n = int.from_bytes( - curv_profile[8:12], byteorder="big" - ) # # ICC.1:2022 Table 35 n value - - print(f"'curv' Profile Signature for {trc_name}: {curv_signature}") - print(f"'curv' Reserved Value: {curv_reserved}") - print(f"'curv_{trc_name}_gamma_n' Value: {curv_trc_gamma_n}") - - curv_trc_field_length = ( - curv_trc_gamma_n * 2 + 12 - ) # ICC.1:2022 Table 35 2n field length - print(f"'curv_{trc_name}_field_length': {curv_trc_field_length}") - - """Check if curv_trc_gamma_n is not 1 and ask - for confirmation to proceed, loops through all TRC tags""" + curv_signature = curv_profile[0:4].decode("utf-8") # ICC.1:2022 Table 35 tag signature + curv_reserved = int.from_bytes(curv_profile[4:8], byteorder="big") # ICC.1:2022 Table 35 reserved 0's + curv_trc_gamma_n = int.from_bytes(curv_profile[8:12], byteorder="big") # # ICC.1:2022 Table 35 n value + + self.logger.debug(f"'curv' Profile Signature for {trc_name}: {curv_signature}") + self.logger.debug(f"'curv' Reserved Value: {curv_reserved}") + self.logger.debug(f"'curv_{trc_name}_gamma_n' Value: {curv_trc_gamma_n}") + curv_trc_field_length = curv_trc_gamma_n * 2 + 12 # ICC.1:2022 Table 35 2n field length + self.logger.debug(f"'curv_{trc_name}_field_length': {curv_trc_field_length}") + + # Check if the curv_trc_gamma_n is not 1, if not then skip processing of file if curv_trc_gamma_n != 1: - print( - f"""Warning: 'curv_{trc_name}_gamma_n' value is { - curv_trc_gamma_n - }, expected 1.""" - ) - proceed = input( - f"""Do you want to proceed with fixing the file { - self.file_path - } (y/n): """ - ).lower() - if proceed != "y": - print(f"Skipping fixing for {self.file_path}") - return new_contents + self.logger.warning(f"""Warning: In file '{self.file_path}', 'curv_{trc_name}_gamma_n' value is { + curv_trc_gamma_n + }, expected 1. Modification may be required.""") + return new_contents if trc_tag_size != curv_trc_field_length: - print( - f"""'{trc_name}' Tag Size ({ - trc_tag_size - }) does not match 'curv_{ - trc_name - }_field_length' ({ - curv_trc_field_length - }). Modifying size-""" - ) - new_trc_size_bytes = curv_trc_field_length.to_bytes( - 4, - byteorder="big") - new_contents[ - trc_position + 8: trc_position + 12 - ] = new_trc_size_bytes - + self.logger.warning(f"""'{trc_name}' Tag Size ({trc_tag_size}) does not match 'curv_{ + trc_name}_field_length' ({curv_trc_field_length}). Modifying the size...""") + new_trc_size_bytes = curv_trc_field_length.to_bytes(4, byteorder='big') + new_contents[trc_position + 8: trc_position + 12] = new_trc_size_bytes return new_contents def process_all_trc_tags(self, header_offset_position): - """Function to process 'TRC' tags (rTRC, gTRC, bTRC).""" + # Function to process 'TRC' tags (rTRC, gTRC, bTRC). new_file_contents = bytearray(self.file_contents) trc_tags = { b"\x72\x54\x52\x43": "rTRC", # search hex for 'rTRC' @@ -238,34 +156,29 @@ def process_all_trc_tags(self, header_offset_position): } for trc_hex, trc_name in trc_tags.items(): - new_file_contents = self.process_trc_tag( - trc_hex, trc_name, new_file_contents, header_offset_position - ) + new_file_contents = self.process_trc_tag(trc_hex, trc_name, new_file_contents, header_offset_position) return new_file_contents def write_modified_file(self, new_file_contents): - """Writes modified file contents to new file if changes were made.""" + # Writes modified file contents to new file if changes were made. if new_file_contents != self.file_contents: - timestamp = datetime.datetime.now().strftime( - "%Y%m%d" - ) # use "%Y%m%d_%H%M%S" for more precision - new_file_path = self.file_path.replace( - ".jp2", f"_modified_{timestamp}.jp2") + timestamp = datetime.datetime.now().strftime("%Y%m%d") # use "%Y%m%d_%H%M%S" for more precision + new_file_path = self.file_path.replace(".jp2", f"_modified_{timestamp}.jp2") with open(new_file_path, "wb") as new_file: new_file.write(new_file_contents) - print(f"New JP2 file created with modifications: {new_file_path}") + self.logger.info(f"New JP2 file created with modifications: {new_file_path}") else: - print("No modifications were needed. No new file was created.") + self.logger.debug("No modifications needed. No new file created.") def read_jp2_file(self): - """Main function to read, validate, and modify JP2 files.""" + # Main function to read, validate, and modify JP2 files. if not self.file_contents: return self.initialize_validator() is_valid = self.validator._isValid() - print("Is file valid?", is_valid) + self.logger.info("Is file valid?", is_valid) header_offset_position = self.check_boxes() new_file_contents = self.process_all_trc_tags(header_offset_position) @@ -274,7 +187,7 @@ def read_jp2_file(self): def process_directory(directory_path): - """Process all JP2 files in a given directory.""" + # Process all JP2 files in a given directory. for root, _, files in os.walk(directory_path): for file in files: if file.lower().endswith(".jp2"): @@ -285,7 +198,7 @@ def process_directory(directory_path): def process_s3_bucket(bucket_name, prefix=""): - """Process all JP2 files in a given S3 bucket.""" + # Process all JP2 files in a given S3 bucket. s3 = boto3.client("s3") response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix) @@ -293,47 +206,15 @@ def process_s3_bucket(bucket_name, prefix=""): for obj in response["Contents"]: if obj["Key"].lower().endswith(".jp2"): file_path = obj["Key"] - print(f"""Processing file: {file_path} from bucket { - bucket_name - }""") + print(f"Processing file: {file_path} from bucket {bucket_name}") download_path = f"/tmp/{os.path.basename(file_path)}" s3.download_file(bucket_name, file_path, download_path) reader = BoxReader(download_path) reader.read_jp2_file() # Optionally, upload modified file back to S3 - timestamp = datetime.datetime.now().strftime( - "%Y%m%d" - ) # use "%Y%m%d_%H%M%S" for more precision + timestamp = datetime.datetime.now().strftime("%Y%m%d") # use "%Y%m%d_%H%M%S" for more precision s3.upload_file( - download_path.replace( - ".jp2", f"_modified_{timestamp}.jp2" - ), + download_path.replace(".jp2", f"_modified_{timestamp}.jp2"), bucket_name, file_path.replace(".jp2", f"_modified_{timestamp}.jp2"), ) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="JP2 file processor") - parser.add_argument("--file", help="Path to a single JP2 file to process.") - parser.add_argument( - "--directory", help="Path to a directory of JP2 files to process." - ) - parser.add_argument( - "--bucket", help="Name of the AWS S3 bucket to process JP2 files from." - ) - parser.add_argument( - "--prefix", help="Prefix of files in the AWS S3 bucket (optional)." - ) - - args = parser.parse_args() - - if args.file: - reader = BoxReader(args.file) - reader.read_jp2_file() - elif args.directory: - process_directory(args.directory) - elif args.bucket: - process_s3_bucket(args.bucket, args.prefix) - else: - print("Please specify either --file, --directory, or --bucket.") diff --git a/src/jp2_remediator/main.py b/src/jp2_remediator/main.py index 426e551..a64a55e 100644 --- a/src/jp2_remediator/main.py +++ b/src/jp2_remediator/main.py @@ -1,34 +1,32 @@ -import sys -import os +import argparse +from jp2_remediator.box_reader import BoxReader, process_directory, process_s3_bucket def main(): - if len(sys.argv) != 3: - print("Usage: python script.py ") - sys.exit(1) - - folder_path1 = sys.argv[1] - folder_path2 = sys.argv[2] - - if not os.path.isdir(folder_path1): - print(f"Error: {folder_path1} is not a valid directory.") - sys.exit(1) - - if not os.path.isdir(folder_path2): - print(f"Error: {folder_path2} is not a valid directory.") - sys.exit(1) - - print(f"Folder 1: {folder_path1}") - print(f"Folder 2: {folder_path2}") + parser = argparse.ArgumentParser(description="JP2 file processor") + parser.add_argument("--file", help="Path to a single JP2 file to process.") + parser.add_argument( + "--directory", help="Path to a directory of JP2 files to process." + ) + parser.add_argument( + "--bucket", help="Name of the AWS S3 bucket to process JP2 files from." + ) + parser.add_argument( + "--prefix", help="Prefix of files in the AWS S3 bucket (optional)." + ) + + args = parser.parse_args() + + if args.file: + reader = BoxReader(args.file) + reader.read_jp2_file() + elif args.directory: + process_directory(args.directory) + elif args.bucket: + process_s3_bucket(args.bucket, args.prefix) + else: + print("Please specify either --file, --directory, or --bucket.") if __name__ == "__main__": main() - - -def hello_world(): - print("Hello, world!") - - -def add_one(number): - return number + 1 diff --git a/src/jp2_remediator/tests/unit/test_box_reader.py b/src/jp2_remediator/tests/unit/test_box_reader.py index 94bc3a7..ba1bf86 100644 --- a/src/jp2_remediator/tests/unit/test_box_reader.py +++ b/src/jp2_remediator/tests/unit/test_box_reader.py @@ -1,7 +1,7 @@ import unittest import os -from unittest.mock import patch, mock_open -from jp2_remediator.box_reader import BoxReader +from unittest.mock import patch, mock_open, MagicMock +from jp2_remediator.box_reader import BoxReader, process_directory, process_s3_bucket from jpylyzer import boxvalidator from project_paths import paths import datetime @@ -13,15 +13,18 @@ class TestJP2ProcessingWithFile(unittest.TestCase): def setUp(self): - """Set up a BoxReader instance for each test.""" + # Set up a BoxReader instance for each test. self.reader = BoxReader(TEST_DATA_PATH) + self.reader.logger = MagicMock() # Mock logger directly + # Test for read_file method def test_read_file_with_valid_path(self): # Test reading a valid test file result = self.reader.read_file(TEST_DATA_PATH) self.assertIsNotNone(result) # Ensure file content is not None self.assertIsInstance(result, bytes) # Ensure file content is in bytes + # Test for initialize_validator method def test_initialize_validator_with_file_content(self): # Read file content file_contents = self.reader.read_file(TEST_DATA_PATH) @@ -33,6 +36,7 @@ def test_initialize_validator_with_file_content(self): validator = self.reader.initialize_validator() self.assertIsInstance(validator, boxvalidator.BoxValidator) + # Test for find_box_position method def test_find_box_position_in_file(self): # Read file content file_contents = self.reader.read_file(TEST_DATA_PATH) @@ -46,6 +50,7 @@ def test_find_box_position_in_file(self): position = self.reader.find_box_position(b"\x6a\x70\x32\x68") self.assertNotEqual(position, -1) # Ensure that the box is found + # Test for check_boxes method def test_check_boxes_in_file(self): # Read file content file_contents = self.reader.read_file(TEST_DATA_PATH) @@ -59,6 +64,7 @@ def test_check_boxes_in_file(self): header_offset_position = self.reader.check_boxes() self.assertIsNotNone(header_offset_position) + # Test for process_colr_box method def test_process_colr_box_in_file(self): # Read file content file_contents = self.reader.read_file(TEST_DATA_PATH) @@ -77,6 +83,7 @@ def test_process_colr_box_in_file(self): header_offset_position = self.reader.process_colr_box(colr_position) self.assertIsNotNone(header_offset_position) + # Test for write_modified_file method @patch( "builtins.open", new_callable=mock_open, @@ -106,6 +113,373 @@ def test_write_modified_file_with_changes(self, mock_file): # Ensure the contents were written correctly mock_file().write.assert_called_once_with(b"sample content modified") + # Test for read_file method with IOError + @patch("builtins.open", new_callable=mock_open) + def test_read_file_with_io_error(self, mock_open_func): + # Mock open and read a file and get an error + mock_open_func.side_effect = IOError("Unable to read file") + result = self.reader.read_file("nonexistent.jp2") + self.assertIsNone(result) + self.reader.logger.error.assert_called_once_with( + "Error reading file nonexistent.jp2: Unable to read file" + ) + + # Test for process_all_trc_tags method + def test_process_all_trc_tags(self): + # Create TRC tags to process + trc_tags = (b"\x72\x54\x52\x43" + b"\x67\x54\x52\x43" + + b"\x62\x54\x52\x43") + self.reader.file_contents = bytearray(b"\x00" * 50 + trc_tags + + b"\x00" * 50) + header_offset_position = 50 + modified_contents = self.reader.process_all_trc_tags( + header_offset_position + ) + self.assertEqual(modified_contents, self.reader.file_contents) + + # Test for process_directory function + @patch("jp2_remediator.box_reader.BoxReader") + @patch("os.walk", return_value=[("root", [], ["file1.jp2", "file2.jp2"])]) + @patch("builtins.print") + def test_process_directory_with_multiple_files( + self, mock_print, mock_os_walk, mock_box_reader + ): + # Process a dir with multiple jp2 files + # Mock the logger for each BoxReader instance created + mock_box_reader.return_value.logger = MagicMock() + + # Call process_directory with a dummy path + process_directory("dummy_path") + + # Check that each JP2 file in the directory was processed + mock_print.assert_any_call("Processing file: root/file1.jp2") + mock_print.assert_any_call("Processing file: root/file2.jp2") + + # Ensure each BoxReader instance had its read_jp2_file method called + self.assertEqual( + mock_box_reader.return_value.read_jp2_file.call_count, 2 + ) + + # Test for check_boxes method logging when 'jp2h' not found + def test_jp2h_not_found_logging(self): + # Set up file_contents to simulate a missing 'jp2h' box + self.reader.file_contents = b"\x00" * 100 + # Arbitrary content without 'jp2h' + # Call the method that should log the debug message + self.reader.check_boxes() + # Check that the specific debug message was logged + self.reader.logger.debug.assert_any_call( + "'jp2h' not found in the file." + ) + + # Test for write_modified_file method when no changes + @patch("builtins.open", new_callable=mock_open) + def test_write_modified_file_no_changes(self, mock_file): + # Set the file contents to simulate a situation with no modifications + original_content = b"original content" + self.reader.file_contents = original_content + + # Call write_modified_file with identical content + self.reader.write_modified_file(original_content) + + # Ensure that no file was written because there were no modifications + mock_file.assert_not_called() + + # Check that the specific debug message was logged + self.reader.logger.debug.assert_called_once_with( + "No modifications needed. No new file created." + ) + + # Test for process_colr_box method when meth_value == 1 + def test_process_colr_box_meth_value_1(self): + # Create file contents with exactly positioned 'colr' box and meth_value = 1 + # Ensure 'colr' starts at 100, followed by 4 bytes, and then meth_value at 1 + self.reader.file_contents = ( + b"\x00" * 100 + # Padding before 'colr' box + b"\x63\x6f\x6c\x72" + # 'colr' box + # b"\x00\x00\x00\x00" + # Four placeholder bytes - kim why issue + b"\x01" # meth_value set to 1 + ) + colr_position = 100 + header_offset_position = self.reader.process_colr_box(colr_position) + expected_position = colr_position + 4 + 7 + # Assert the expected header offset position + self.assertEqual(header_offset_position, expected_position) + self.reader.logger.debug.assert_any_call( + f"'meth' is 1, setting header_offset_position to: {expected_position}" + ) + + # Test for process_colr_box method with unrecognized meth_value + def test_process_colr_box_unrecognized_meth_value(self): + self.reader.file_contents = ( + b"\x00" * 100 + # Padding before 'colr' box + b"\x63\x6f\x6c\x72" + # 'colr' box + # b"\x00\x00\x00\x00" + # Four placeholder bytes - kim why issue + b"\x03" # meth_value set to 3 + ) + colr_position = 100 + # print("File Contents:", self.reader.file_contents) # Debug print + header_offset_position = self.reader.process_colr_box(colr_position) + self.assertIsNone(header_offset_position) + self.reader.logger.debug.assert_any_call( + "'meth' value 3 is not recognized (must be 1 or 2)." + ) + + # Test for process_colr_box method when 'colr' box is missing + def test_process_colr_box_missing(self): + self.reader.file_contents = b"\x00" * 100 + colr_position = -1 + header_offset_position = self.reader.process_colr_box(colr_position) + self.assertIsNone(header_offset_position) + self.reader.logger.debug.assert_any_call("'colr' not found in the file.") + + # Test for process_trc_tag method with incomplete trc_tag_entry + def test_process_trc_tag_incomplete_entry(self): + # Prepare the test data + self.reader.file_contents = b"\x00" * 100 + b"\x72\x54\x52\x43" + b"\x00" * 6 + trc_hex = b"\x72\x54\x52\x43" # Hex for 'rTRC' + header_offset_position = 50 + original_contents = bytearray(self.reader.file_contents) + + # Call the method under test + new_contents = self.reader.process_trc_tag(trc_hex, "rTRC", original_contents, header_offset_position) + + # Assert that the appropriate debug message was logged + expected_message = "Could not extract the full 12-byte 'rTRC' tag entry." + self.reader.logger.debug.assert_any_call(expected_message) + + # Assert that new_contents is unchanged + self.assertEqual(new_contents, original_contents) + + # Test for process_trc_tag: trc_hex not found in new_contents + def test_process_trc_tag_trc_hex_not_found(self): + # Prepare the test data for when trc_hex is not found + trc_hex = b"\x72\x54\x52\x43" # Hex value not present in new_contents + trc_name = "rTRC" + new_contents = bytearray(b"\x00" * 100) # Sample contents without trc_hex + header_offset_position = 50 + + # Call process_trc_tag and expect no modifications to new_contents + result = self.reader.process_trc_tag(trc_hex, trc_name, new_contents, header_offset_position) + + # Check that the function returned the original new_contents + self.assertEqual(result, new_contents) + + # Verify that the correct debug message was logged + self.reader.logger.debug.assert_any_call(f"'{trc_name}' not found in the file.") + + # Test for process_trc_tag: header_offset_position is None + def test_process_trc_tag_header_offset_none(self): + # Prepare the test data where header_offset_position is None + trc_hex = b"\x72\x54\x52\x43" # Hex value found in new_contents + trc_name = "rTRC" + new_contents = bytearray(b"\x00" * 50 + trc_hex + b"\x00" * 50) + header_offset_position = None # Simulate unrecognized meth value + + # Call process_trc_tag and expect no modifications to new_contents + result = self.reader.process_trc_tag(trc_hex, trc_name, new_contents, header_offset_position) + + # Check that the function returned the original new_contents + self.assertEqual(result, new_contents) + + # Verify that the correct debug message was logged + self.reader.logger.debug.assert_any_call( + f"Cannot calculate 'curv_{trc_name}_position' due to an unrecognized 'meth' value." + ) + + # Test for read_jp2_file method when file_contents is valid + def test_read_jp2_file(self): + # Prepare the test data with valid file contents + self.reader.file_contents = b"Valid JP2 content" + + # Mock dependent methods and attributes + with patch.object(self.reader, 'initialize_validator') as mock_initialize_validator, \ + patch.object(self.reader, 'validator') as mock_validator, \ + patch.object(self.reader, 'check_boxes') as mock_check_boxes, \ + patch.object(self.reader, 'process_all_trc_tags') as mock_process_all_trc_tags, \ + patch.object(self.reader, 'write_modified_file') as mock_write_modified_file: + + # Set up the mock for validator._isValid() + mock_validator._isValid.return_value = True + + # Set up return values for other methods + mock_check_boxes.return_value = 100 # Example header_offset_position + mock_process_all_trc_tags.return_value = b"Modified JP2 content" + + # Call the method under test + self.reader.read_jp2_file() + + # Assert that initialize_validator was called once + mock_initialize_validator.assert_called_once() + + # Assert that validator._isValid() was called once + mock_validator._isValid.assert_called_once() + + # Assert that logger.info was called with correct parameters + self.reader.logger.info.assert_called_with("Is file valid?", True) + + # Assert that check_boxes was called once + mock_check_boxes.assert_called_once() + + # Assert that process_all_trc_tags was called with the correct header_offset_position + mock_process_all_trc_tags.assert_called_once_with(100) + + # Assert that write_modified_file was called with the modified contents + mock_write_modified_file.assert_called_once_with(b"Modified JP2 content") + + # Test for read_jp2_file method when file_contents is None or empty + def test_read_jp2_file_no_file_contents(self): + # Set file_contents to None to simulate missing content + self.reader.file_contents = None + + # Mock dependent methods to ensure they are not called + with patch.object(self.reader, 'initialize_validator') as mock_initialize_validator, \ + patch.object(self.reader, 'check_boxes') as mock_check_boxes, \ + patch.object(self.reader, 'process_all_trc_tags') as mock_process_all_trc_tags, \ + patch.object(self.reader, 'write_modified_file') as mock_write_modified_file: + + # Call the method under test + self.reader.read_jp2_file() + + # Assert that the method returns early and dependent methods are not called + mock_initialize_validator.assert_not_called() + mock_check_boxes.assert_not_called() + mock_process_all_trc_tags.assert_not_called() + mock_write_modified_file.assert_not_called() + + # Test for process_s3_bucket function + @patch("jp2_remediator.box_reader.boto3.client") + @patch("jp2_remediator.box_reader.BoxReader") + @patch("builtins.print") + def test_process_s3_bucket(self, mock_print, mock_box_reader, mock_boto3_client): + # Set up the mock S3 client + mock_s3_client = MagicMock() + mock_boto3_client.return_value = mock_s3_client + + # Define the bucket name and prefix + bucket_name = "test-bucket" + prefix = "test-prefix" + + # Prepare a fake response for list_objects_v2 + mock_s3_client.list_objects_v2.return_value = { + "Contents": [ + {"Key": "file1.jp2"}, + {"Key": "file2.jp2"}, + {"Key": "file3.txt"}, # Non-JP2 file to test filtering + ] + } + + # Mock download_file and upload_file methods + mock_s3_client.download_file.return_value = None + mock_s3_client.upload_file.return_value = None + + # Mock BoxReader instance and its read_jp2_file method + mock_reader_instance = MagicMock() + mock_box_reader.return_value = mock_reader_instance + + # Call the method under test + process_s3_bucket(bucket_name, prefix) + + # Verify that list_objects_v2 was called with the correct parameters + mock_s3_client.list_objects_v2.assert_called_once_with(Bucket=bucket_name, Prefix=prefix) + + # Verify that download_file was called for each .jp2 file + expected_download_calls = [ + unittest.mock.call(bucket_name, "file1.jp2", "/tmp/file1.jp2"), + unittest.mock.call(bucket_name, "file2.jp2", "/tmp/file2.jp2"), + ] + self.assertEqual(mock_s3_client.download_file.call_args_list, expected_download_calls) + + # Verify that BoxReader was instantiated with the correct download paths + expected_boxreader_calls = [ + unittest.mock.call("/tmp/file1.jp2"), + unittest.mock.call("/tmp/file2.jp2"), + ] + self.assertEqual(mock_box_reader.call_args_list, expected_boxreader_calls) + + # Verify that read_jp2_file was called for each .jp2 file + self.assertEqual(mock_reader_instance.read_jp2_file.call_count, 2) + + # Verify that upload_file was called for each .jp2 file + upload_calls = mock_s3_client.upload_file.call_args_list + self.assertEqual(len(upload_calls), 2) + for call in upload_calls: + args, _ = call + local_file_path = args[0] + upload_bucket = args[1] + upload_key = args[2] + # Check that the local file path includes '_modified_' and ends with '.jp2' + self.assertIn("_modified_", local_file_path) + self.assertTrue(local_file_path.endswith(".jp2")) + # Check that the upload is to the correct bucket and key + self.assertEqual(upload_bucket, bucket_name) + self.assertIn("_modified_", upload_key) + self.assertTrue(upload_key.endswith(".jp2")) + + # Verify that print was called correctly + expected_print_calls = [ + unittest.mock.call(f"Processing file: file1.jp2 from bucket {bucket_name}"), + unittest.mock.call(f"Processing file: file2.jp2 from bucket {bucket_name}"), + ] + mock_print.assert_has_calls(expected_print_calls, any_order=True) + + # Test for process_trc_tag: when trc_tag_size != curv_trc_field_length + def test_process_trc_tag_size_mismatch(self): + # Prepare test data where trc_tag_size does not match curv_trc_field_length + trc_hex = b'\x72\x54\x52\x43' # Hex for 'rTRC' + trc_name = 'rTRC' + trc_position = 10 # Arbitrary position where trc_hex is found in new_contents + + # Set trc_tag_offset and trc_tag_size with values that will cause a mismatch + trc_tag_offset = 50 # Arbitrary offset value + trc_tag_size = 20 # Set intentionally different from curv_trc_field_length + + # Build the trc_tag_entry (12 bytes): signature + offset + size + trc_tag_entry = trc_hex + trc_tag_offset.to_bytes(4, 'big') + trc_tag_size.to_bytes(4, 'big') + + # Prepare new_contents with the trc_tag_entry at trc_position + new_contents = bytearray(b'\x00' * trc_position + trc_tag_entry + b'\x00' * 200) + + # Set header_offset_position to a valid integer + header_offset_position = 5 # Arbitrary valid value + + # Prepare curv_profile data with curv_trc_gamma_n such that curv_trc_field_length != trc_tag_size + curv_trc_gamma_n = 1 # Set gamma_n to 2 + curv_trc_field_length = curv_trc_gamma_n * 2 + 12 # Calculates to 16 + + # Build curv_profile (12 bytes): signature + reserved + gamma_n + curv_signature = b'curv' # Signature 'curv' + curv_reserved = (0).to_bytes(4, 'big') # Reserved bytes set to zero + curv_trc_gamma_n_bytes = curv_trc_gamma_n.to_bytes(4, 'big') + curv_profile = curv_signature + curv_reserved + curv_trc_gamma_n_bytes + + # Calculate curv_trc_position based on trc_tag_offset and header_offset_position + curv_trc_position = trc_tag_offset + header_offset_position + + # Ensure new_contents is large enough to hold the curv_profile at the calculated position + required_length = curv_trc_position + len(curv_profile) + if len(new_contents) < required_length: + new_contents.extend(b'\x00' * (required_length - len(new_contents))) + + # Insert curv_profile into new_contents at curv_trc_position + new_contents[curv_trc_position:curv_trc_position + len(curv_profile)] = curv_profile + + # Mock the logger to capture warnings + self.reader.logger = MagicMock() + + # Call the method under test + result_contents = self.reader.process_trc_tag(trc_hex, trc_name, new_contents, header_offset_position) + + # Verify that the trc_tag_size in new_contents was updated to curv_trc_field_length + updated_trc_tag_size_bytes = result_contents[trc_position + 8: trc_position + 12] + updated_trc_tag_size = int.from_bytes(updated_trc_tag_size_bytes, 'big') + self.assertEqual(updated_trc_tag_size, curv_trc_field_length) + + # Verify that the appropriate warning was logged + expected_warning = f"""'{trc_name}' Tag Size ({trc_tag_size}) does not match 'curv_{trc_name}_field_length' ({ + curv_trc_field_length}). Modifying the size...""" + self.reader.logger.warning.assert_any_call(expected_warning) + if __name__ == "__main__": unittest.main()