diff --git a/fido/char_handler.py b/fido/char_handler.py index bfa41ce..69d5c0c 100644 --- a/fido/char_handler.py +++ b/fido/char_handler.py @@ -5,24 +5,28 @@ # \a\b\n\r\t\v # MdR: took out '<' and '>' out of _ordinary because they were converted to entities <> -# MdR: moved '!' from _ordinary to _special because it means "NOT" in the regex world. At this time no regex in any sig has a negate set, did this to be on the safe side -ORDINARY = frozenset(' "#%&\',-/0123456789:;=@ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz~') -SPECIAL = '$()*+.?![]^\\{|}' # Before: '$*+.?![]^\\{|}' -HEX = '0123456789abcdef' +# MdR: moved '!' from _ordinary to _special because it means "NOT" in the regex +# world. At this time no regex in any sig has a negate set, did this to be on +# the safe side +ORDINARY = frozenset( + " \"#%&',-/0123456789:;=@ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz~" +) +SPECIAL = "$()*+.?![]^\\{|}" # Before: '$*+.?![]^\\{|}' +HEX = "0123456789abcdef" def escape_char(c): """Add appropriate escape sequence to passed character c.""" - if c in '\n': - return '\\n' - if c == '\r': - return '\\r' + if c in "\n": + return "\\n" + if c == "\r": + return "\\r" if c in SPECIAL: - return '\\' + c + return "\\" + c (high, low) = divmod(ord(c), 16) - return '\\x' + HEX[high] + HEX[low] + return "\\x" + HEX[high] + HEX[low] def escape(string): """Escape characters in pattern that are non-printable, non-ascii, or special for regexes.""" - return ''.join(c if c in ORDINARY else escape_char(c) for c in string) + return "".join(c if c in ORDINARY else escape_char(c) for c in string) diff --git a/fido/cli_args.py b/fido/cli_args.py index fc74bfe..2cbcd14 100644 --- a/fido/cli_args.py +++ b/fido/cli_args.py @@ -19,10 +19,24 @@ def parse_cli_args(argv: List[str], defaults: Dict[str, Any]) -> argparse.Namesp fromfile_prefix_chars="@", formatter_class=RawTextHelpFormatter, ) - parser.add_argument("-v", default=False, action="store_true", help="show version information") - parser.add_argument("-q", default=False, action="store_true", help="run (more) quietly") - parser.add_argument("-recurse", default=False, action="store_true", help="recurse into subdirectories") - parser.add_argument("-zip", default=False, action="store_true", help="recurse into zip and tar files") + parser.add_argument( + "-v", default=False, action="store_true", help="show version information" + ) + parser.add_argument( + "-q", default=False, action="store_true", help="run (more) quietly" + ) + parser.add_argument( + "-recurse", + default=False, + action="store_true", + help="recurse into subdirectories", + ) + parser.add_argument( + "-zip", + default=False, + action="store_true", + help="recurse into zip and tar files", + ) parser.add_argument( "-noextension", default=False, @@ -44,7 +58,9 @@ def parse_cli_args(argv: List[str], defaults: Dict[str, Any]) -> argparse.Namesp group = parser.add_mutually_exclusive_group() group.add_argument( - "-input", default=False, help="file containing a list of files to check, one per line. - means stdin" + "-input", + default=False, + help="file containing a list of files to check, one per line. - means stdin", ) group.add_argument( "files", @@ -54,7 +70,9 @@ def parse_cli_args(argv: List[str], defaults: Dict[str, Any]) -> argparse.Namesp help="files to check. If the file is -, then read content from stdin. In this case, python must be invoked with -u or it may convert the line terminators.", ) - parser.add_argument("-filename", default=None, help="filename if file contents passed through STDIN") + parser.add_argument( + "-filename", default=None, help="filename if file contents passed through STDIN" + ) parser.add_argument( "-useformats", metavar="INCLUDEPUIDS", @@ -98,7 +116,10 @@ def parse_cli_args(argv: List[str], defaults: Dict[str, Any]) -> argparse.Namesp help=f"size (in bytes) of the buffer to match against (default={defaults['container_bufsize']}).", ) parser.add_argument( - "-loadformats", default=None, metavar="XML1,...,XMLn", help="comma separated string of XML format files to add." + "-loadformats", + default=None, + metavar="XML1,...,XMLn", + help="comma separated string of XML format files to add.", ) parser.add_argument( "-confdir", diff --git a/fido/fido.py b/fido/fido.py index e298b93..9e42a5a 100755 --- a/fido/fido.py +++ b/fido/fido.py @@ -541,16 +541,20 @@ def can_recurse_into_container(self, container_type): """ return container_type in ("zip", "tar") + # This is updated following PR #191: FIX: Develop out FIDO tests with pytest + # It should fix a problem that streams (not files) would hang. + # Needs thorough testing, though. def blocking_read(self, file, bytes_to_read): """Perform a blocking read and return the buffer.""" bytes_read = 0 buffer = b"" while bytes_read < bytes_to_read: readbuffer = file.read(bytes_to_read - bytes_read) + last_read_len = len(readbuffer) buffer += readbuffer - bytes_read = len(buffer) - # break out if EOF is reached. - if readbuffer == "": + bytes_read += last_read_len + # break out if EOF is reached, that is zero bytes read. + if last_read_len < 1: break return buffer diff --git a/fido/prepare.py b/fido/prepare.py index 32a81d1..7580bef 100644 --- a/fido/prepare.py +++ b/fido/prepare.py @@ -140,7 +140,12 @@ def load_pronom_xml(self, puid_filter=None): try: zip.close() except Exception as e: - print("An error occured loading '{0}' (exception: {1})".format(self.pronom_files, e), file=sys.stderr) + print( + "An error occured loading '{0}' (exception: {1})".format( + self.pronom_files, e + ), + file=sys.stderr, + ) sys.exit() # Replace the formatID with puids in has_priority_over if puid_filter is None: @@ -195,28 +200,44 @@ def parse_pronom_xml(self, source, puid_filter=None): ET.SubElement(fido_format, "container").text = "zip" elif puid == "x-fmt/265": ET.SubElement(fido_format, "container").text = "tar" - ET.SubElement(fido_format, "name").text = get_text_tna(pronom_format, "FormatName") - ET.SubElement(fido_format, "version").text = get_text_tna(pronom_format, "FormatVersion") - ET.SubElement(fido_format, "alias").text = get_text_tna(pronom_format, "FormatAliases") - ET.SubElement(fido_format, "pronom_id").text = get_text_tna(pronom_format, "FormatID") + ET.SubElement(fido_format, "name").text = get_text_tna( + pronom_format, "FormatName" + ) + ET.SubElement(fido_format, "version").text = get_text_tna( + pronom_format, "FormatVersion" + ) + ET.SubElement(fido_format, "alias").text = get_text_tna( + pronom_format, "FormatAliases" + ) + ET.SubElement(fido_format, "pronom_id").text = get_text_tna( + pronom_format, "FormatID" + ) # Get the extensions from the ExternalSignature for x in pronom_format.findall(TNA("ExternalSignature")): ET.SubElement(fido_format, "extension").text = get_text_tna(x, "Signature") for id in pronom_format.findall(TNA("FileFormatIdentifier")): type = get_text_tna(id, "IdentifierType") if type == "Apple Uniform Type Identifier": - ET.SubElement(fido_format, "apple_uti").text = get_text_tna(id, "Identifier") + ET.SubElement(fido_format, "apple_uti").text = get_text_tna( + id, "Identifier" + ) # Handle the relationships for x in pronom_format.findall(TNA("RelatedFormat")): rel = get_text_tna(x, "RelationshipType") if rel == "Has priority over": - ET.SubElement(fido_format, "has_priority_over").text = get_text_tna(x, "RelatedFormatID") + ET.SubElement(fido_format, "has_priority_over").text = get_text_tna( + x, "RelatedFormatID" + ) # Get the InternalSignature information for pronom_sig in pronom_format.findall(TNA("InternalSignature")): fido_sig = ET.SubElement(fido_format, "signature") - ET.SubElement(fido_sig, "name").text = get_text_tna(pronom_sig, "SignatureName") + ET.SubElement(fido_sig, "name").text = get_text_tna( + pronom_sig, "SignatureName" + ) # There are some funny chars in the notes, which caused me trouble and it is a unicode string, - ET.SubElement(fido_sig, "note").text = get_text_tna(pronom_sig, "SignatureNote") + ET.SubElement(fido_sig, "note").text = get_text_tna( + pronom_sig, "SignatureNote" + ) for pronom_pat in pronom_sig.findall(TNA("ByteSequence")): # print('Parsing ID:{}'.format(puid)) fido_pat = ET.SubElement(fido_sig, "pattern") @@ -228,15 +249,24 @@ def parse_pronom_xml(self, source, puid_filter=None): pass # print "working on puid:", puid, ", position: ", pos, "with offset, maxoffset: ", offset, ",", max_offset try: - regex = convert_to_regex(byte_seq, "Little", pos, offset, max_offset) + regex = convert_to_regex( + byte_seq, "Little", pos, offset, max_offset + ) except ValueError as ve: - print("ValueError converting PUID {} signature to regex: {}".format(puid, ve), file=sys.stderr) + print( + "ValueError converting PUID {} signature to regex: {}".format( + puid, ve + ), + file=sys.stderr, + ) regex = FLG_INCOMPATIBLE # print "done puid", puid if regex == FLG_INCOMPATIBLE: print( - "Error: incompatible PRONOM signature found for puid {} skipping...".format(puid), + "Error: incompatible PRONOM signature found for puid {} skipping...".format( + puid + ), file=sys.stderr, ) # remove the empty 'signature' nodes @@ -250,47 +280,73 @@ def parse_pronom_xml(self, source, puid_filter=None): ET.SubElement(fido_pat, "regex").text = regex # Get the format details fido_details = ET.SubElement(fido_format, "details") - ET.SubElement(fido_details, "dc:description").text = get_text_tna(pronom_format, "FormatDescription") - ET.SubElement(fido_details, "dcterms:available").text = get_text_tna(pronom_format, "ReleaseDate") - ET.SubElement(fido_details, "dc:creator").text = get_text_tna(pronom_format, "Developers/DeveloperCompoundName") + ET.SubElement(fido_details, "dc:description").text = get_text_tna( + pronom_format, "FormatDescription" + ) + ET.SubElement(fido_details, "dcterms:available").text = get_text_tna( + pronom_format, "ReleaseDate" + ) + ET.SubElement(fido_details, "dc:creator").text = get_text_tna( + pronom_format, "Developers/DeveloperCompoundName" + ) ET.SubElement(fido_details, "dcterms:publisher").text = get_text_tna( pronom_format, "Developers/OrganisationName" ) for x in pronom_format.findall(TNA("RelatedFormat")): rel = get_text_tna(x, "RelationshipType") if rel == "Is supertype of": - ET.SubElement(fido_details, "is_supertype_of").text = get_text_tna(x, "RelatedFormatID") + ET.SubElement(fido_details, "is_supertype_of").text = get_text_tna( + x, "RelatedFormatID" + ) for x in pronom_format.findall(TNA("RelatedFormat")): rel = get_text_tna(x, "RelationshipType") if rel == "Is subtype of": - ET.SubElement(fido_details, "is_subtype_of").text = get_text_tna(x, "RelatedFormatID") - ET.SubElement(fido_details, "content_type").text = get_text_tna(pronom_format, "FormatTypes") + ET.SubElement(fido_details, "is_subtype_of").text = get_text_tna( + x, "RelatedFormatID" + ) + ET.SubElement(fido_details, "content_type").text = get_text_tna( + pronom_format, "FormatTypes" + ) # References for x in pronom_format.findall(TNA("Document")): r = ET.SubElement(fido_details, "reference") ET.SubElement(r, "dc:title").text = get_text_tna(x, "TitleText") - ET.SubElement(r, "dc:creator").text = get_text_tna(x, "Author/AuthorCompoundName") - ET.SubElement(r, "dc:publisher").text = get_text_tna(x, "Publisher/PublisherCompoundName") - ET.SubElement(r, "dcterms:available").text = get_text_tna(x, "PublicationDate") + ET.SubElement(r, "dc:creator").text = get_text_tna( + x, "Author/AuthorCompoundName" + ) + ET.SubElement(r, "dc:publisher").text = get_text_tna( + x, "Publisher/PublisherCompoundName" + ) + ET.SubElement(r, "dcterms:available").text = get_text_tna( + x, "PublicationDate" + ) for id in x.findall(TNA("DocumentIdentifier")): type = get_text_tna(id, "IdentifierType") if type == "URL": - ET.SubElement(r, "dc:identifier").text = "http://" + get_text_tna(id, "Identifier") + ET.SubElement(r, "dc:identifier").text = "http://" + get_text_tna( + id, "Identifier" + ) else: ET.SubElement(r, "dc:identifier").text = ( - get_text_tna(id, "IdentifierType") + ":" + get_text_tna(id, "Identifier") + get_text_tna(id, "IdentifierType") + + ":" + + get_text_tna(id, "Identifier") ) ET.SubElement(r, "dc:description").text = get_text_tna(x, "DocumentNote") ET.SubElement(r, "dc:type").text = get_text_tna(x, "DocumentType") ET.SubElement(r, "dcterms:license").text = ( - get_text_tna(x, "AvailabilityDescription") + " " + get_text_tna(x, "AvailabilityNote") + get_text_tna(x, "AvailabilityDescription") + + " " + + get_text_tna(x, "AvailabilityNote") ) ET.SubElement(r, "dc:rights").text = get_text_tna(x, "DocumentIPR") # Examples for x in pronom_format.findall(TNA("ReferenceFile")): rf = ET.SubElement(fido_details, "example_file") ET.SubElement(rf, "dc:title").text = get_text_tna(x, "ReferenceFileName") - ET.SubElement(rf, "dc:description").text = get_text_tna(x, "ReferenceFileDescription") + ET.SubElement(rf, "dc:description").text = get_text_tna( + x, "ReferenceFileDescription" + ) checksum = "" for id in x.findall(TNA("ReferenceFileIdentifier")): type = get_text_tna(id, "IdentifierType") @@ -308,14 +364,20 @@ def parse_pronom_xml(self, source, puid_filter=None): m.update(sock.read()) sock.close() except HTTPError as http_excep: - sys.stderr.write("HTTP {} error loading resource {}\n".format(http_excep.code, url)) + sys.stderr.write( + "HTTP {} error loading resource {}\n".format( + http_excep.code, url + ) + ) if http_excep.code == 404: continue checksum = m.hexdigest() else: ET.SubElement(rf, "dc:identifier").text = ( - get_text_tna(id, "IdentifierType") + ":" + get_text_tna(id, "Identifier") + get_text_tna(id, "IdentifierType") + + ":" + + get_text_tna(id, "Identifier") ) ET.SubElement(rf, "dcterms:license").text = "" ET.SubElement(rf, "dc:rights").text = get_text_tna(x, "ReferenceFileIPR") @@ -325,10 +387,18 @@ def parse_pronom_xml(self, source, puid_filter=None): # Record Metadata md = ET.SubElement(fido_details, "record_metadata") ET.SubElement(md, "status").text = "unknown" - ET.SubElement(md, "dc:creator").text = get_text_tna(pronom_format, "ProvenanceName") - ET.SubElement(md, "dcterms:created").text = get_text_tna(pronom_format, "ProvenanceSourceDate") - ET.SubElement(md, "dcterms:modified").text = get_text_tna(pronom_format, "LastUpdatedDate") - ET.SubElement(md, "dc:description").text = get_text_tna(pronom_format, "ProvenanceDescription") + ET.SubElement(md, "dc:creator").text = get_text_tna( + pronom_format, "ProvenanceName" + ) + ET.SubElement(md, "dcterms:created").text = get_text_tna( + pronom_format, "ProvenanceSourceDate" + ) + ET.SubElement(md, "dcterms:modified").text = get_text_tna( + pronom_format, "LastUpdatedDate" + ) + ET.SubElement(md, "dc:description").text = get_text_tna( + pronom_format, "ProvenanceDescription" + ) return fido_format # FIXME: I don't think that this quite works yet! @@ -415,7 +485,9 @@ def do_byte(chars, i, littleendian, esc=True): c2 = "0123456789ABCDEF".find(chars[i + 1].upper()) buf = StringIO() if c1 < 0 or c2 < 0: - raise Exception(_convert_err_msg("bad byte sequence", chars[i : i + 2], i, chars, buf)) + raise Exception( + _convert_err_msg("bad byte sequence", chars[i : i + 2], i, chars, buf) + ) if littleendian: val = chr(16 * c1 + c2) else: @@ -481,12 +553,16 @@ def calculate_repetition(char, pos, offset, maxoffset): def do_all_bitmasks(chars, i, littleendian): """(byte & bitmask) == bitmask.""" - return do_any_all_bitmasks(chars, i, lambda byt, bitmask: ((byt & bitmask) == bitmask), littleendian) + return do_any_all_bitmasks( + chars, i, lambda byt, bitmask: ((byt & bitmask) == bitmask), littleendian + ) def do_any_bitmasks(chars, i, littleendian): """(byte & bitmask) != 0.""" - return do_any_all_bitmasks(chars, i, lambda byt, bitmask: ((byt & bitmask) != 0), littleendian) + return do_any_all_bitmasks( + chars, i, lambda byt, bitmask: ((byt & bitmask) != 0), littleendian + ) def do_any_all_bitmasks(chars, i, predicate, littleendian): @@ -505,7 +581,13 @@ def do_any_all_bitmasks(chars, i, predicate, littleendian): byt, inc = do_byte(chars, i + 1, littleendian, esc=False) bitmask = ord(byt) regex = "({})".format( - "|".join(["\\x" + hex(byte)[2:].zfill(2) for byte in range(0x100) if predicate(byte, bitmask)]) + "|".join( + [ + "\\x" + hex(byte)[2:].zfill(2) + for byte in range(0x100) + if predicate(byte, bitmask) + ] + ) ) return regex, inc + 1 @@ -563,7 +645,11 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""): elif chars[i] in "*+?": state = "specials" else: - raise ValueError(_convert_err_msg("Illegal character in start", chars[i], i, chars, buf)) + raise ValueError( + _convert_err_msg( + "Illegal character in start", chars[i], i, chars, buf + ) + ) elif state == "bytes": (byt, inc) = do_byte(chars, i, littleendian) buf.write(byt) @@ -598,7 +684,11 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""): elif chars[i] == "]": break else: - raise Exception(_convert_err_msg("Illegal character in non-match", chars[i], i, chars, buf)) + raise Exception( + _convert_err_msg( + "Illegal character in non-match", chars[i], i, chars, buf + ) + ) buf.write(")") i += 1 state = "start" @@ -624,7 +714,11 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""): buf.write("]") i += 1 except Exception: - print(_convert_err_msg("Illegal character in bracket", chars[i], i, chars, buf)) + print( + _convert_err_msg( + "Illegal character in bracket", chars[i], i, chars, buf + ) + ) raise if i < len(chars) and chars[i] == "{": state = "curly-after-bracket" @@ -667,7 +761,9 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""): else: raise Exception( _convert_err_msg( - ("Current state = '{0}' : Illegal character in paren").format(state), + ( + "Current state = '{0}' : Illegal character in paren" + ).format(state), chars[i], i, chars, @@ -700,7 +796,11 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""): elif chars[i] == "}": break else: - raise Exception(_convert_err_msg("Illegal character in curly", chars[i], i, chars, buf)) + raise Exception( + _convert_err_msg( + "Illegal character in curly", chars[i], i, chars, buf + ) + ) buf.write("}") i += 1 # skip the ) state = "start" @@ -713,7 +813,11 @@ def convert_to_regex(chars, endianness="", pos="BOF", offset="0", maxoffset=""): i += 1 elif chars[i] == "?": if chars[i + 1] != "?": - raise Exception(_convert_err_msg("Illegal character after ?", chars[i + 1], i + 1, chars, buf)) + raise Exception( + _convert_err_msg( + "Illegal character after ?", chars[i + 1], i + 1, chars, buf + ) + ) buf.write(".?") i += 2 state = "start" @@ -741,7 +845,10 @@ def run(input=None, output=None, puid=None): info = FormatInfo(input) info.load_pronom_xml(puid) info.save(output) - print("Converted {0} PRONOM formats to FIDO signatures".format(len(info.formats)), file=sys.stderr) + print( + "Converted {0} PRONOM formats to FIDO signatures".format(len(info.formats)), + file=sys.stderr, + ) def main(args=None): @@ -749,10 +856,16 @@ def main(args=None): if args is None: args = sys.argv[1:] - parser = ArgumentParser(description="Produce the FIDO format XML that is loaded at run-time") - parser.add_argument("-input", default=None, help="Input file, a Zip containing PRONOM XML files") + parser = ArgumentParser( + description="Produce the FIDO format XML that is loaded at run-time" + ) + parser.add_argument( + "-input", default=None, help="Input file, a Zip containing PRONOM XML files" + ) parser.add_argument("-output", default=None, help="Output file") - parser.add_argument("-puid", default=None, help="A particular PUID record to extract") + parser.add_argument( + "-puid", default=None, help="A particular PUID record to extract" + ) args = parser.parse_args(args) run(input=args.input, output=args.output, puid=args.puid) diff --git a/fido/pronom/soap.py b/fido/pronom/soap.py index c853612..67d2a73 100644 --- a/fido/pronom/soap.py +++ b/fido/pronom/soap.py @@ -50,7 +50,9 @@ def get_sig_xml_for_puid(puid): """Return the full PRONOM signature XML for the passed PUID.""" - req = urllib.request.Request("http://www.nationalarchives.gov.uk/pronom/{}.xml".format(puid)) + req = urllib.request.Request( + "http://www.nationalarchives.gov.uk/pronom/{}.xml".format(puid) + ) response = urllib.request.urlopen(req) xml = response.read() return xml @@ -80,12 +82,16 @@ def get_droid_signatures(version): format_count = False try: with urllib.request.urlopen( - "https://www.nationalarchives.gov.uk/documents/DROID_SignatureFile_V{}.xml".format(version) + "https://www.nationalarchives.gov.uk/documents/DROID_SignatureFile_V{}.xml".format( + version + ) ) as f: xml = f.read().decode("utf-8") root_ele = ET.fromstring(xml) format_count = len( - root_ele.findall(".//{http://www.nationalarchives.gov.uk/pronom/SignatureFile}FileFormat") + root_ele.findall( + ".//{http://www.nationalarchives.gov.uk/pronom/SignatureFile}FileFormat" + ) ) except HTTPError as httpe: sys.stderr.write( @@ -111,9 +117,15 @@ def _get_soap_ele_tree(soap_action): def _get_soap_response(soap_action, soap_string): try: - req = urllib.request.Request("http://{}/pronom/service.asmx".format(PRONOM_HOST), data=soap_string) + req = urllib.request.Request( + "http://{}/pronom/service.asmx".format(PRONOM_HOST), data=soap_string + ) except URLError: - print("There was a problem contacting the PRONOM service at http://{}/pronom/service.asmx.".format(PRONOM_HOST)) + print( + "There was a problem contacting the PRONOM service at http://{}/pronom/service.asmx.".format( + PRONOM_HOST + ) + ) print("Please check your network connection and try again.") sys.exit(1) for key, value in HEADERS.items(): diff --git a/fido/update_signatures.py b/fido/update_signatures.py index 43a2d68..919dfad 100644 --- a/fido/update_signatures.py +++ b/fido/update_signatures.py @@ -23,7 +23,12 @@ from . import CONFIG_DIR, __version__ from .prepare import run as prepare_pronom_to_fido -from .pronom.soap import NS, get_droid_signatures, get_pronom_sig_version, get_sig_xml_for_puid +from .pronom.soap import ( + NS, + get_droid_signatures, + get_pronom_sig_version, + get_sig_xml_for_puid, +) from .versions import get_local_versions ABORT_MSG = "Aborting update..." @@ -112,7 +117,9 @@ def sig_version_check(version="latest"): print("Getting latest version number from PRONOM...") version = get_pronom_sig_version() if not version: - sys.exit("Failed to obtain PRONOM signature file version number, please try again.") + sys.exit( + "Failed to obtain PRONOM signature file version number, please try again." + ) print("Querying PRONOM for signaturefile version {}.".format(version)) sig_file_name = _sig_file_name(version) @@ -152,7 +159,9 @@ def init_sig_download(defaults): resume = False if os.path.isdir(tmpdir): print("Found previously created temporary folder for download:", tmpdir) - resume = query_yes_no("Do you want to resume download (yes) or start over (no)?") + resume = query_yes_no( + "Do you want to resume download (yes) or start over (no)?" + ) if resume: print("Resuming download...") else: @@ -162,7 +171,9 @@ def init_sig_download(defaults): except OSError: pass if not os.path.isdir(tmpdir): - sys.stderr.write("Failed to create temporary folder for PUID's, using: " + tmpdir) + sys.stderr.write( + "Failed to create temporary folder for PUID's, using: " + tmpdir + ) return tmpdir, resume @@ -176,7 +187,10 @@ def download_signatures(defaults, format_eles, resume, tmpdir): download_sig(format_ele, tmpdir, resume, defaults) numfiles += 1 print( - r"Downloaded {}/{} files [{}%]".format(numfiles, puid_count, int(float(numfiles) / one_percent)), end="\r" + r"Downloaded {}/{} files [{}%]".format( + numfiles, puid_count, int(float(numfiles) / one_percent) + ), + end="\r", ) print("100%") @@ -208,7 +222,10 @@ def create_zip_file(defaults, format_eles, version, tmpdir): print("Creating PRONOM zip...") compression = zipfile.ZIP_DEFLATED if "zlib" in sys.modules else zipfile.ZIP_STORED modes = {zipfile.ZIP_DEFLATED: "deflated", zipfile.ZIP_STORED: "stored"} - zf = zipfile.ZipFile(os.path.join(CONFIG_DIR, DEFAULTS["pronomZipFileName"].format(version)), mode="w") + zf = zipfile.ZipFile( + os.path.join(CONFIG_DIR, DEFAULTS["pronomZipFileName"].format(version)), + mode="w", + ) print("Adding files with compression mode", modes[compression]) for format_ele in format_eles: _, puid_filename = get_puid_file_name(format_ele) @@ -241,8 +258,15 @@ def update_versions_xml(version): def main(): """Main CLI entrypoint.""" - parser = ArgumentParser(description="Download and convert the latest PRONOM signatures") - parser.add_argument("-tmpdir", default=OPTIONS["tmp_dir"], help="Location to store temporary files", dest="tmp_dir") + parser = ArgumentParser( + description="Download and convert the latest PRONOM signatures" + ) + parser.add_argument( + "-tmpdir", + default=OPTIONS["tmp_dir"], + help="Location to store temporary files", + dest="tmp_dir", + ) parser.add_argument( "-keep_tmp", default=OPTIONS["deleteTempDirectory"], diff --git a/fido/versions.py b/fido/versions.py index 94dae67..55fa220 100644 --- a/fido/versions.py +++ b/fido/versions.py @@ -87,7 +87,9 @@ def __setattr__(self, name, value): def get_zip_file(self): """Obtain location to the PRONOM XML Zip file based on the current PRONOM version.""" - return os.path.join(self.conf_dir, "pronom-xml-v{}.zip".format(self.pronom_version)) + return os.path.join( + self.conf_dir, "pronom-xml-v{}.zip".format(self.pronom_version) + ) def get_signature_file(self): """Obtain location to the current PRONOM signature file.""" @@ -99,7 +101,9 @@ def write(self): for key, value in self.PROPS_MAPPING.items(): if self.root.find(value) is None: raise ValueError("Field {} has not been defined!".format(key)) - self.tree.write(self.versions_file, xml_declaration=True, method="xml", encoding="utf-8") + self.tree.write( + self.versions_file, xml_declaration=True, method="xml", encoding="utf-8" + ) def get_local_versions(config_dir=CONFIG_DIR): @@ -143,11 +147,19 @@ def _list_available_versions(update_url): def _check_update_signatures(sig_vers, update_url, versions, is_update=False): is_new, latest = _version_check(sig_vers, update_url) if is_new: - sys.stdout.write("Updated signatures v{} are available, current version is v{}\n".format(latest, sig_vers)) + sys.stdout.write( + "Updated signatures v{} are available, current version is v{}\n".format( + latest, sig_vers + ) + ) if is_update: _output_details(latest, update_url, versions) else: - sys.stdout.write("Your signature files are up to date, current version is v{}\n".format(sig_vers)) + sys.stdout.write( + "Your signature files are up to date, current version is v{}\n".format( + sig_vers + ) + ) sys.exit(0) @@ -157,15 +169,23 @@ def _download_sig_version(sig_act, update_url, versions): if not match: sys.exit( - '{} is not a valid version number, to download a sig file try "-sig v104" or "-sig 104".'.format(sig_act) + '{} is not a valid version number, to download a sig file try "-sig v104" or "-sig 104".'.format( + sig_act + ) ) ver = sig_act if not ver.startswith("v"): ver = "v" + sig_act resp = requests.get(update_url + "format/" + ver + "/") if resp.status_code != 200: - sys.exit("No signature files found for {}, REST status {}".format(sig_act, resp.status_code)) - _output_details(re.search(r"\d+|$", ver).group(), update_url, versions) # noqa: W605 + sys.exit( + "No signature files found for {}, REST status {}".format( + sig_act, resp.status_code + ) + ) + _output_details( + re.search(r"\d+|$", ver).group(), update_url, versions + ) # noqa: W605 def _get_version(ver_string): @@ -173,7 +193,9 @@ def _get_version(ver_string): match = re.search(r"^v?(\d+)$", ver_string, re.IGNORECASE) if not match: sys.exit( - '{} is not a valid version number, to download a sig file try "-sig v104" or "-sig 104".'.format(ver_string) + '{} is not a valid version number, to download a sig file try "-sig v104" or "-sig 104".'.format( + ver_string + ) ) ver = ver_string return ver_string if not ver.startswith("v") else ver_string[1:] @@ -192,14 +214,18 @@ def _output_details(version, update_url, versions): def _version_check(sig_ver, update_url): resp = requests.get(update_url + "format/latest/") if resp.status_code != 200: - sys.exit("Error getting latest version info: HTTP Status {}".format(resp.status_code)) + sys.exit( + "Error getting latest version info: HTTP Status {}".format(resp.status_code) + ) root_ele = ET.fromstring(resp.text) latest = _get_version(root_ele.get("version")) return int(latest) > int(sig_ver), latest def _write_sigs(latest, update_url, type, name_template): - sig_out = str(importlib.resources.files("fido").joinpath("conf", name_template.format(latest))) + sig_out = str( + importlib.resources.files("fido").joinpath("conf", name_template.format(latest)) + ) if os.path.exists(sig_out): return resp = requests.get(update_url + "format/{0}/{1}/".format(latest, type)) diff --git a/pyproject.toml b/pyproject.toml index 9aa0996..07ef2c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,6 @@ classifiers = [ dependencies = [ "olefile >= 0.46, < 1", "requests", - "flake8", ] [project.urls] @@ -41,6 +40,7 @@ homepage = "http://openpreservation.org/technology/products/fido/" testing = [ "pytest", "pytest-cov", + "flake8", ] [project.scripts] @@ -66,5 +66,8 @@ addopts = "--maxfail=1 --strict-markers" [tool.flake8] exclude = ['.venv'] ignore = ['E231', 'E241', 'E501', 'W503', 'E203'] -max-line-length = 130 -# count = true \ No newline at end of file +max-line-length = 120 +# count = true + +[tool.ruff] +line-length = 120 \ No newline at end of file diff --git a/tests/pronom/test_soap.py b/tests/pronom/test_soap.py index daea45a..cb6509d 100644 --- a/tests/pronom/test_soap.py +++ b/tests/pronom/test_soap.py @@ -32,5 +32,5 @@ def test_pronom_signature(): """Test that retrieving signatures gets something with length and no errors are thrown.""" version = soap.get_pronom_sig_version() xml, count = soap.get_droid_signatures(version) - assert len(xml) > 1000, 'Expected more than 1000 XML lines, got %s' % len(xml) - assert count > 1000, 'Expected more than 1000 signatures, got %s' % count + assert len(xml) > 1000, "Expected more than 1000 XML lines, got %s" % len(xml) + assert count > 1000, "Expected more than 1000 signatures, got %s" % count diff --git a/tests/test_fido.py b/tests/test_fido.py index 952a588..e0c256e 100644 --- a/tests/test_fido.py +++ b/tests/test_fido.py @@ -1,13 +1,86 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import csv +import io from time import sleep -from fido.fido import PerfTimer +import pytest + +from fido.fido import Fido, PerfTimer def test_perf_timer(): timer = PerfTimer() - sleep(3.6) + sleep(0.2) duration = timer.duration() assert duration > 0 + + +id_test_data = [(b"\x5a\x58\x54\x61\x70\x65\x21\x1a\x01", "fmt/1000", "OK")] + + +@pytest.mark.parametrize( + "magic, expected_puid, expected_result", + id_test_data, + # Add additional test cases here +) +def test_file_identification(tmp_path, capsys, magic: bytes, expected_puid: str, expected_result: str): + """Reference for Fido-based format identification + 1. Create a byte-stream with a known magic number and serialize to tempfile. + 2. Call identify_file(...) to identify the file against Fido's known formats. + """ + # Create a temporary file and write our skeleton file out to it. + tmp_file = tmp_path / "tmp_file" + tmp_file.write_bytes(magic) + + # Create a Fido instance and call identify_file. The identify_file function + # will create and manage a file for itself. + f = Fido() + f.identify_file(str(tmp_file)) + + # Capture the stdout returned by Fido and make assertions about its + # validity. + captured = capsys.readouterr() + # TODO: there is a signature that generates an error + # min repeat greater than max repeat at position 8 + # assert captured.err == "" + reader = csv.reader(io.StringIO(captured.out), delimiter=",") + assert reader is not None + row = next(reader) + assert row[0] == expected_result, "row hasn't returned a positive identification" + assert row[2] == expected_puid, "row doesn't contain expected PUID value" + assert int(row[5]) == len(magic), "row doesn't contain stream length" + + +@pytest.mark.parametrize( + "magic, expected_puid, expected_result", + id_test_data, + # Add additional test cases here +) +def test_stream_identification(capsys, magic: bytes, expected_puid: str, expected_result: str): + """Reference for Fido-based format identification + 1. Create a byte-stream with a known magic number. + 2. Call identify_stream(...) to identify the file against Fido's known formats. + """ + # Create the stream object with the known magic-number. + fstream = io.BytesIO(magic) + + # Create a Fido instance and call identify_stream. The identify_stream function + # will work on the stream as-is. This could be an open file handle that the + # caller is managing for itself. + f = Fido() + f.identify_stream(fstream, "filename to display", extension=False) + + # Capture the stdout returned by Fido and make assertions about its + # validity. + captured = capsys.readouterr() + # TODO: as above, there is a signature that outputs an error + # min repeat greater than max repeat at position 8 + # assert captured.err == "" + reader = csv.reader(io.StringIO(captured.out), delimiter=",") + assert reader is not None + row = next(reader) + assert row[0] == expected_result, "row hasn't returned a positive identification" + assert row[2] == expected_puid, "row doesn't contain expected PUID value" + assert int(row[5]) == len(magic), "row doesn't contain stream length" diff --git a/tests/test_package.py b/tests/test_package.py index b4123cd..534a1f7 100644 --- a/tests/test_package.py +++ b/tests/test_package.py @@ -4,11 +4,15 @@ from fido.package import ZipPackage -TEST_DATA_BAD_PACKAGES = os.path.normpath(os.path.join(__file__, "..", "test_data/hard_packages")) +TEST_DATA_BAD_PACKAGES = os.path.normpath( + os.path.join(__file__, "..", "test_data/hard_packages") +) # None of these files should be identified as packages? -@pytest.mark.parametrize("filename", ["bad.zip", "worse.zip", "unicode.zip", "foo.zip", "foo.tar"]) +@pytest.mark.parametrize( + "filename", ["bad.zip", "worse.zip", "unicode.zip", "foo.zip", "foo.tar"] +) def test_bad_zip(filename): p = ZipPackage(os.path.join(TEST_DATA_BAD_PACKAGES, filename), {}) r = p.detect_formats() diff --git a/tests/test_prepare.py b/tests/test_prepare.py index f843f23..752fcd3 100644 --- a/tests/test_prepare.py +++ b/tests/test_prepare.py @@ -15,43 +15,40 @@ def binrep_convert(byt): @pytest.mark.parametrize( - ('pronom_bytesequence', 'matches_predicate'), + ("pronom_bytesequence", "matches_predicate"), ( # ANY BITMASKS, e.g., ~FF # ~07 = 00000111. Match bytes with any of the first three bits set. - ('~07', lambda binrep: '1' in binrep[-3:]), + ("~07", lambda binrep: "1" in binrep[-3:]), # ~7f = 01111111. Match bytes with any of the first seven bits set. - ('~7f', lambda binrep: '1' in binrep[-7:]), + ("~7f", lambda binrep: "1" in binrep[-7:]), # ~00 = 00000000. Match no bytes. # TODO: is it possible to write a regular expression that matches no # bytes? The regex pattern returned here matches ANY byte... - ('~00', lambda binrep: True), - + ("~00", lambda binrep: True), # NEGATED ANY BITMASKS, e.g., [!~FF] # [!~80] = 10000000. Match bytes without the last bit set. - ('[!~80]', lambda binrep: binrep.startswith('0')), + ("[!~80]", lambda binrep: binrep.startswith("0")), # [!~ff] = 11111111. Match bytes without any of the bitmask bits set. - ('[!~ff]', lambda binrep: binrep == '00000000'), + ("[!~ff]", lambda binrep: binrep == "00000000"), # [!~87] = 10000111. - ('[!~87]', lambda br: br.startswith('0') and br.endswith('000')), - + ("[!~87]", lambda br: br.startswith("0") and br.endswith("000")), # ALL BITMASKS, e.g., &FF # &07 = 00000111. Match bytes with all first three bits set. - ('&07', lambda binrep: binrep.endswith('111')), + ("&07", lambda binrep: binrep.endswith("111")), # &7f = 01111111. Match bytes with all first seven bits set. - ('&7f', lambda binrep: binrep.endswith('1111111')), + ("&7f", lambda binrep: binrep.endswith("1111111")), # &00 = 00000000. Matches any byte. - ('&00', lambda binrep: True), - + ("&00", lambda binrep: True), # NEGATED ALL BITMASKS, e.g., [!&FF] # !&80 = 10000000. Match bytes without the last bit set. - ('[!&80]', lambda binrep: binrep.startswith('0')), + ("[!&80]", lambda binrep: binrep.startswith("0")), # !&87 = 10000111. Match all bytes that don't have the first three bits # set and the last bit set also. - ('[!&87]', lambda br: not (br.startswith('1') and br.endswith('111'))), + ("[!&87]", lambda br: not (br.startswith("1") and br.endswith("111"))), # !&ff = 11111111. Match all bytes except 255. - ('[!&ff]', lambda binrep: not binrep == '11111111'), - ) + ("[!&ff]", lambda binrep: not binrep == "11111111"), + ), ) def test_bitmasks(pronom_bytesequence, matches_predicate): patt = convert_to_regex(pronom_bytesequence) @@ -64,25 +61,21 @@ def test_bitmasks(pronom_bytesequence, matches_predicate): @pytest.mark.parametrize( - ('pronom_bytesequence', 'input_', 'matches_bool'), + ("pronom_bytesequence", "input_", "matches_bool"), ( # These are good: - ('ab{3}cd(01|02|03)~07ff', '\xAB\xDD\xDD\xDD\xCD\x02\x11\xFF', True), - ('ab{3}cd(01|02|03)~07ff', '\xAB\xDD\xDD\xDD\xCD\x03\x11\xFF', True), - ('ab{3}cd(01|02|03)~07ff', '\xAB\xDD\xDD\xDD\xCD\x02\xFE\xFF', True), - + ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x02\x11\xFF", True), + ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x03\x11\xFF", True), + ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x02\xFE\xFF", True), # Bad because missing three anythings between AB and CD - ('ab{3}cd(01|02|03)~07ff', '\xAB\xDD\xDD\xCD\x02\x11\xFF', False), - + ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xCD\x02\x11\xFF", False), # Bad because not at start of string - ('ab{3}cd(01|02|03)~07ff', '\xDA\xAB\xDD\xDD\xDD\xCD\x02\x11\xFF', False), - + ("ab{3}cd(01|02|03)~07ff", "\xDA\xAB\xDD\xDD\xDD\xCD\x02\x11\xFF", False), # Bad because 04 is not in (01|02|03) - ('ab{3}cd(01|02|03)~07ff', '\xAB\xDD\xDD\xDD\xCD\x04\x11\xFF', False), - + ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x04\x11\xFF", False), # Bad because 18 is not in ~07 - ('ab{3}cd(01|02|03)~07ff', '\xAB\xDD\xDD\xDD\xCD\x02\x18\xFF', False), - ) + ("ab{3}cd(01|02|03)~07ff", "\xAB\xDD\xDD\xDD\xCD\x02\x18\xFF", False), + ), ) def test_heterogenous_sequences(pronom_bytesequence, input_, matches_bool): """Tests potential PRONOM sequences in their fullness.