From 43338f568abdb16cfb9969e71f00b8dbffbf13af Mon Sep 17 00:00:00 2001 From: Stijn Vermeeren Date: Tue, 4 Jun 2024 16:28:28 +0200 Subject: [PATCH] use lines instead of raw text for finding coordinate keys --- src/stratigraphy/extract.py | 36 +---- src/stratigraphy/main.py | 4 +- .../util/coordinate_extraction.py | 135 +++++++++--------- src/stratigraphy/util/extract_text.py | 45 ++++++ 4 files changed, 125 insertions(+), 95 deletions(-) create mode 100644 src/stratigraphy/util/extract_text.py diff --git a/src/stratigraphy/extract.py b/src/stratigraphy/extract.py index 28b01485..5753f3b4 100644 --- a/src/stratigraphy/extract.py +++ b/src/stratigraphy/extract.py @@ -19,7 +19,7 @@ find_layer_identifier_column, find_layer_identifier_column_entries, ) -from stratigraphy.util.line import TextLine, TextWord +from stratigraphy.util.line import TextLine from stratigraphy.util.textblock import TextBlock, block_distance from stratigraphy.util.util import ( parse_and_remove_empty_predictions, @@ -30,13 +30,13 @@ logger = logging.getLogger(__name__) -def process_page(page: fitz.Page, geometric_lines, language: str, **params: dict) -> list[dict]: +def process_page(lines: list[TextLine], geometric_lines, language: str, **params: dict) -> list[dict]: """Process a single page of a pdf. Finds all descriptions and depth intervals on the page and matches them. Args: - page (fitz.Page): The page to process. + lines (list[TextLine]): all the text lines on the page. geometric_lines (list[Line]): The geometric lines of the page. language (str): The language of the page. **params (dict): Additional parameters for the matching pipeline. @@ -44,32 +44,6 @@ def process_page(page: fitz.Page, geometric_lines, language: str, **params: dict Returns: list[dict]: All list of the text of all description blocks. """ - words = [] - words_by_line = {} - for x0, y0, x1, y1, word, block_no, line_no, _word_no in fitz.utils.get_text(page, "words"): - rect = fitz.Rect(x0, y0, x1, y1) * page.rotation_matrix - text_word = TextWord(rect, word) - words.append(text_word) - key = f"{block_no}_{line_no}" - if key not in words_by_line: - words_by_line[key] = [] - words_by_line[key].append(text_word) - - raw_lines = [TextLine(words_by_line[key]) for key in words_by_line] - - lines = [] - current_line_words = [] - for line_index, raw_line in enumerate(raw_lines): - for word_index, word in enumerate(raw_line.words): - remaining_line = TextLine(raw_line.words[word_index:]) - if len(current_line_words) > 0 and remaining_line.is_line_start(lines, raw_lines[line_index + 1 :]): - lines.append(TextLine(current_line_words)) - current_line_words = [] - current_line_words.append(word) - if len(current_line_words): - lines.append(TextLine(current_line_words)) - current_line_words = [] - # Detect Layer Index Columns layer_identifier_entries = find_layer_identifier_column_entries(lines) layer_identifier_columns = ( @@ -84,10 +58,12 @@ def process_page(page: fitz.Page, geometric_lines, language: str, **params: dict if material_description_rect: pairs.append((layer_identifier_column, material_description_rect)) - # Obtain the best pair. In contrast do depth columns, there only ever is one layer index column per page. + # Obtain the best pair. In contrast to depth columns, there only ever is one layer index column per page. if pairs: pairs.sort(key=lambda pair: score_column_match(pair[0], pair[1])) + words = [word for line in lines for word in line.words] + # If there is a layer identifier column, then we use this directly. # Else, we search for depth columns. We could also think of some scoring mechanism to decide which one to use. if not pairs: diff --git a/src/stratigraphy/main.py b/src/stratigraphy/main.py index 7f863fff..ef2e36b3 100644 --- a/src/stratigraphy/main.py +++ b/src/stratigraphy/main.py @@ -16,6 +16,7 @@ from stratigraphy.util.coordinate_extraction import CoordinateExtractor from stratigraphy.util.draw import draw_predictions from stratigraphy.util.duplicate_detection import remove_duplicate_layers +from stratigraphy.util.extract_text import extract_text_lines from stratigraphy.util.language_detection import detect_language_of_document from stratigraphy.util.plot_utils import plot_lines from stratigraphy.util.util import flatten, read_params @@ -176,9 +177,10 @@ def start_pipeline( page_number = page_index + 1 logger.info("Processing page %s", page_number) + text_lines = extract_text_lines(page) geometric_lines = extract_lines(page, line_detection_params) layer_predictions, depths_materials_column_pairs = process_page( - page, geometric_lines, language, **matching_params + text_lines, geometric_lines, language, **matching_params ) # Add remove duplicates here! if page_index > 0: diff --git a/src/stratigraphy/util/coordinate_extraction.py b/src/stratigraphy/util/coordinate_extraction.py index 8328466b..93054525 100644 --- a/src/stratigraphy/util/coordinate_extraction.py +++ b/src/stratigraphy/util/coordinate_extraction.py @@ -7,6 +7,8 @@ import fitz import regex +from stratigraphy.util.extract_text import extract_text_lines +from stratigraphy.util.line import TextLine from stratigraphy.util.util import read_params logger = logging.getLogger(__name__) @@ -134,7 +136,7 @@ def __init__(self, document: fitz.Document): self.doc = document self.coordinate_keys = read_params("matching_params.yml")["coordinate_keys"] - def find_coordinate_key(self, text: str, allowed_errors: int = 3) -> str | None: # noqa: E501 + def find_coordinate_key(self, lines: list[TextLine], allowed_errors: int = 3) -> TextLine | None: # noqa: E501 """Finds the location of a coordinate key in a string of text. This is useful to reduce the text within which the coordinates are searched. If the text is too large @@ -146,47 +148,49 @@ def find_coordinate_key(self, text: str, allowed_errors: int = 3) -> str | None: Args: - text (str): Arbitrary string of text. + lines (list[TextLine]): Arbitrary text lines to search in. allowed_errors (int, optional): The maximum number of errors (Levenshtein distance) to consider a key contained in text. Defaults to 3 (guestimation; no optimisation done yet). Returns: - str | None: The coordinate key found in the text. + TextLine | None: The line of the coordinate key found in the text. """ matches = [] for key in self.coordinate_keys: - match = regex.search(r"\b(" + key + "){e<" + str(allowed_errors) + r"}\s", text, flags=regex.IGNORECASE) - if match: - matches.append((match.group(), sum(match.fuzzy_counts))) + pattern = regex.compile(r"\b(" + key + "){e<" + str(allowed_errors) + r"}\s", flags=regex.IGNORECASE) + for line in lines: + match = pattern.search(line.text) + if match: + matches.append((line, sum(match.fuzzy_counts))) # if no match was found, return None - if matches == []: + if len(matches) == 0: return None - best_match = sorted(matches, key=lambda x: x[1], reverse=True)[0][0] - - return best_match + best_match = min(matches, key=lambda x: x[1]) + return best_match[0] - def get_coordinate_substring(self, text: str) -> str: + def get_coordinate_substring(self, lines: list[TextLine], page_width: float) -> str: """Returns the substring of a text that contains the coordinate information. Args: - text (str): Arbitrary string of text. + lines (list[TextLine]): The lines of text to search in. + page_width (float): The width of the page (in points / PyMuPDF coordinates) Returns: - str: The substring of the text that contains the coordinate information. + None | str: The substring of the text that is close to an identified coordinate key. """ # find the key that indicates the coordinate information - key = self.find_coordinate_key(text) - - # if no key was found, return None - if key is None: + coordinate_key_line = self.find_coordinate_key(lines) + if coordinate_key_line is None: return "" - coord_start = text.find(key) + len(key) - coord_end = coord_start + 100 # 100 seems to be enough to capture the coordinates; - # and not too much to introduce random numbers - substring = text[coord_start:coord_end] + key_rect = coordinate_key_line.rect + # look for coordinate values to the right and/or immediately below the key + coordinate_search_rect = fitz.Rect(key_rect.x0, key_rect.y0, page_width, key_rect.y1 + 3 * key_rect.height) + coordinate_search_lines = [line for line in lines if line.rect.intersects(coordinate_search_rect)] + + substring = " ".join([line.text for line in coordinate_search_lines]) substring = substring.replace(",", ".") substring = substring.replace("'", ".") substring = substring.replace("o", "0") # frequent ocr error @@ -247,47 +251,50 @@ def extract_coordinates(self) -> Coordinate | None: Returns: Coordinate | None: the extracted coordinates (if any) """ - text = "" for page in self.doc: - text += page.get_text() - text = text.replace("\n", " ") - - # Try to get the text by including explicit 'X' and 'Y' labels. - # In this case, we can allow for some whitespace in between the numbers. - # In some older borehole profile the OCR may recognize whitespace between two digits. - x_values = [int("".join(groups)) for groups in regex.findall(r"X[=:\s]{0,3}" + COORDINATE_ENTRY_REGEX, text)] - y_values = [int("".join(groups)) for groups in regex.findall(r"Y[=:\s]{0,3}" + COORDINATE_ENTRY_REGEX, text)] - # We are only checking the 1st x-value with the 1st y-value, the 2nd x-value with the 2nd y-value, etc. - # In some edge cases, the matched x_values and y-values might not be aligned / equal in number. However, - # we ignore this for now, as almost always, the 1st x and y values are already the ones that we are looking - # for. - coordinate_values = list(zip(x_values, y_values, strict=False)) - - if len(coordinate_values) == 0: - # get the substring that contains the coordinate information - coord_substring = self.get_coordinate_substring(text) - coordinate_values = self.get_coordinate_pairs(coord_substring) - - if len(coordinate_values) == 0: - # if that doesn't work, try to directly detect coordinates in the text - coordinate_values = self.get_coordinate_pairs(text) - - if len(coordinate_values) == 0: - logger.info("No coordinates found in this borehole profile.") - return None - - for east, north in coordinate_values: - if east > 1e6 and north > 1e6: - coordinate = LV95Coordinate( - CoordinateEntry(east), - CoordinateEntry(north), - ) - else: - coordinate = LV03Coordinate( - CoordinateEntry(east), - CoordinateEntry(north), - ) - if coordinate.is_valid(): - return coordinate - - logger.warning(f"Could not extract valid coordinates from {coordinate_values}") + text = page.get_text() + text = text.replace("\n", " ") + + lines = extract_text_lines(page) + + # Try to get the text by including explicit 'X' and 'Y' labels. + # In this case, we can allow for some whitespace in between the numbers. + # In some older borehole profile the OCR may recognize whitespace between two digits. + x_values = [ + int("".join(groups)) for groups in regex.findall(r"X[=:\s]{0,3}" + COORDINATE_ENTRY_REGEX, text) + ] + y_values = [ + int("".join(groups)) for groups in regex.findall(r"Y[=:\s]{0,3}" + COORDINATE_ENTRY_REGEX, text) + ] + # We are only checking the 1st x-value with the 1st y-value, the 2nd x-value with the 2nd y-value, etc. + # In some edge cases, the matched x_values and y-values might not be aligned / equal in number. However, + # we ignore this for now, as almost always, the 1st x and y values are already the ones that we are looking + # for. + coordinate_values = list(zip(x_values, y_values, strict=False)) + + if len(coordinate_values) == 0: + # get the substring that contains the coordinate information + coord_substring = self.get_coordinate_substring(lines, page.rect.width) + coordinate_values = self.get_coordinate_pairs(coord_substring) + + if len(coordinate_values) == 0: + # if that doesn't work, try to directly detect coordinates in the text + coordinate_values = self.get_coordinate_pairs(text) + + for east, north in coordinate_values: + if east > 1e6 and north > 1e6: + coordinate = LV95Coordinate( + CoordinateEntry(east), + CoordinateEntry(north), + ) + else: + coordinate = LV03Coordinate( + CoordinateEntry(east), + CoordinateEntry(north), + ) + if coordinate.is_valid(): + return coordinate + + logger.warning(f"Could not extract valid coordinates from {coordinate_values}") + + logger.info("No coordinates found in this borehole profile.") diff --git a/src/stratigraphy/util/extract_text.py b/src/stratigraphy/util/extract_text.py new file mode 100644 index 00000000..fe78fa72 --- /dev/null +++ b/src/stratigraphy/util/extract_text.py @@ -0,0 +1,45 @@ +"""Methods for extracting plain text from a PDF document.""" + +import fitz + +from stratigraphy.util.line import TextLine, TextWord + + +def extract_text_lines(page: fitz.Page) -> list[TextLine]: + """Extract all text lines from the page. + + Sometimes, a single lines as identified by PyMuPDF, is still split into separate lines. + + Args: + page (fitz.page): the page to extract text from + + Returns: + list[TextLine]: A list of text lines. + """ + words = [] + words_by_line = {} + for x0, y0, x1, y1, word, block_no, line_no, _word_no in fitz.utils.get_text(page, "words"): + rect = fitz.Rect(x0, y0, x1, y1) * page.rotation_matrix + text_word = TextWord(rect, word) + words.append(text_word) + key = f"{block_no}_{line_no}" + if key not in words_by_line: + words_by_line[key] = [] + words_by_line[key].append(text_word) + + raw_lines = [TextLine(words_by_line[key]) for key in words_by_line] + + lines = [] + current_line_words = [] + for line_index, raw_line in enumerate(raw_lines): + for word_index, word in enumerate(raw_line.words): + remaining_line = TextLine(raw_line.words[word_index:]) + if len(current_line_words) > 0 and remaining_line.is_line_start(lines, raw_lines[line_index + 1 :]): + lines.append(TextLine(current_line_words)) + current_line_words = [] + current_line_words.append(word) + if len(current_line_words): + lines.append(TextLine(current_line_words)) + current_line_words = [] + + return lines