Skip to content

Commit

Permalink
use lines instead of raw text for finding coordinate keys
Browse files Browse the repository at this point in the history
  • Loading branch information
stijnvermeeren-swisstopo committed Jun 4, 2024
1 parent 6dedc85 commit 43338f5
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 95 deletions.
36 changes: 6 additions & 30 deletions src/stratigraphy/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
find_layer_identifier_column,
find_layer_identifier_column_entries,
)
from stratigraphy.util.line import TextLine, TextWord
from stratigraphy.util.line import TextLine
from stratigraphy.util.textblock import TextBlock, block_distance
from stratigraphy.util.util import (
parse_and_remove_empty_predictions,
Expand All @@ -30,46 +30,20 @@
logger = logging.getLogger(__name__)


def process_page(page: fitz.Page, geometric_lines, language: str, **params: dict) -> list[dict]:
def process_page(lines: list[TextLine], geometric_lines, language: str, **params: dict) -> list[dict]:
"""Process a single page of a pdf.
Finds all descriptions and depth intervals on the page and matches them.
Args:
page (fitz.Page): The page to process.
lines (list[TextLine]): all the text lines on the page.
geometric_lines (list[Line]): The geometric lines of the page.
language (str): The language of the page.
**params (dict): Additional parameters for the matching pipeline.
Returns:
list[dict]: All list of the text of all description blocks.
"""
words = []
words_by_line = {}
for x0, y0, x1, y1, word, block_no, line_no, _word_no in fitz.utils.get_text(page, "words"):
rect = fitz.Rect(x0, y0, x1, y1) * page.rotation_matrix
text_word = TextWord(rect, word)
words.append(text_word)
key = f"{block_no}_{line_no}"
if key not in words_by_line:
words_by_line[key] = []
words_by_line[key].append(text_word)

raw_lines = [TextLine(words_by_line[key]) for key in words_by_line]

lines = []
current_line_words = []
for line_index, raw_line in enumerate(raw_lines):
for word_index, word in enumerate(raw_line.words):
remaining_line = TextLine(raw_line.words[word_index:])
if len(current_line_words) > 0 and remaining_line.is_line_start(lines, raw_lines[line_index + 1 :]):
lines.append(TextLine(current_line_words))
current_line_words = []
current_line_words.append(word)
if len(current_line_words):
lines.append(TextLine(current_line_words))
current_line_words = []

# Detect Layer Index Columns
layer_identifier_entries = find_layer_identifier_column_entries(lines)
layer_identifier_columns = (
Expand All @@ -84,10 +58,12 @@ def process_page(page: fitz.Page, geometric_lines, language: str, **params: dict
if material_description_rect:
pairs.append((layer_identifier_column, material_description_rect))

# Obtain the best pair. In contrast do depth columns, there only ever is one layer index column per page.
# Obtain the best pair. In contrast to depth columns, there only ever is one layer index column per page.
if pairs:
pairs.sort(key=lambda pair: score_column_match(pair[0], pair[1]))

words = [word for line in lines for word in line.words]

# If there is a layer identifier column, then we use this directly.
# Else, we search for depth columns. We could also think of some scoring mechanism to decide which one to use.
if not pairs:
Expand Down
4 changes: 3 additions & 1 deletion src/stratigraphy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from stratigraphy.util.coordinate_extraction import CoordinateExtractor
from stratigraphy.util.draw import draw_predictions
from stratigraphy.util.duplicate_detection import remove_duplicate_layers
from stratigraphy.util.extract_text import extract_text_lines
from stratigraphy.util.language_detection import detect_language_of_document
from stratigraphy.util.plot_utils import plot_lines
from stratigraphy.util.util import flatten, read_params
Expand Down Expand Up @@ -176,9 +177,10 @@ def start_pipeline(
page_number = page_index + 1
logger.info("Processing page %s", page_number)

text_lines = extract_text_lines(page)
geometric_lines = extract_lines(page, line_detection_params)
layer_predictions, depths_materials_column_pairs = process_page(
page, geometric_lines, language, **matching_params
text_lines, geometric_lines, language, **matching_params
)
# Add remove duplicates here!
if page_index > 0:
Expand Down
135 changes: 71 additions & 64 deletions src/stratigraphy/util/coordinate_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import fitz
import regex

from stratigraphy.util.extract_text import extract_text_lines
from stratigraphy.util.line import TextLine
from stratigraphy.util.util import read_params

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -134,7 +136,7 @@ def __init__(self, document: fitz.Document):
self.doc = document
self.coordinate_keys = read_params("matching_params.yml")["coordinate_keys"]

def find_coordinate_key(self, text: str, allowed_errors: int = 3) -> str | None: # noqa: E501
def find_coordinate_key(self, lines: list[TextLine], allowed_errors: int = 3) -> TextLine | None: # noqa: E501
"""Finds the location of a coordinate key in a string of text.
This is useful to reduce the text within which the coordinates are searched. If the text is too large
Expand All @@ -146,47 +148,49 @@ def find_coordinate_key(self, text: str, allowed_errors: int = 3) -> str | None:
Args:
text (str): Arbitrary string of text.
lines (list[TextLine]): Arbitrary text lines to search in.
allowed_errors (int, optional): The maximum number of errors (Levenshtein distance) to consider a key
contained in text. Defaults to 3 (guestimation; no optimisation done yet).
Returns:
str | None: The coordinate key found in the text.
TextLine | None: The line of the coordinate key found in the text.
"""
matches = []
for key in self.coordinate_keys:
match = regex.search(r"\b(" + key + "){e<" + str(allowed_errors) + r"}\s", text, flags=regex.IGNORECASE)
if match:
matches.append((match.group(), sum(match.fuzzy_counts)))
pattern = regex.compile(r"\b(" + key + "){e<" + str(allowed_errors) + r"}\s", flags=regex.IGNORECASE)
for line in lines:
match = pattern.search(line.text)
if match:
matches.append((line, sum(match.fuzzy_counts)))

# if no match was found, return None
if matches == []:
if len(matches) == 0:
return None

best_match = sorted(matches, key=lambda x: x[1], reverse=True)[0][0]

return best_match
best_match = min(matches, key=lambda x: x[1])
return best_match[0]

def get_coordinate_substring(self, text: str) -> str:
def get_coordinate_substring(self, lines: list[TextLine], page_width: float) -> str:
"""Returns the substring of a text that contains the coordinate information.
Args:
text (str): Arbitrary string of text.
lines (list[TextLine]): The lines of text to search in.
page_width (float): The width of the page (in points / PyMuPDF coordinates)
Returns:
str: The substring of the text that contains the coordinate information.
None | str: The substring of the text that is close to an identified coordinate key.
"""
# find the key that indicates the coordinate information
key = self.find_coordinate_key(text)

# if no key was found, return None
if key is None:
coordinate_key_line = self.find_coordinate_key(lines)
if coordinate_key_line is None:
return ""

coord_start = text.find(key) + len(key)
coord_end = coord_start + 100 # 100 seems to be enough to capture the coordinates;
# and not too much to introduce random numbers
substring = text[coord_start:coord_end]
key_rect = coordinate_key_line.rect
# look for coordinate values to the right and/or immediately below the key
coordinate_search_rect = fitz.Rect(key_rect.x0, key_rect.y0, page_width, key_rect.y1 + 3 * key_rect.height)
coordinate_search_lines = [line for line in lines if line.rect.intersects(coordinate_search_rect)]

substring = " ".join([line.text for line in coordinate_search_lines])
substring = substring.replace(",", ".")
substring = substring.replace("'", ".")
substring = substring.replace("o", "0") # frequent ocr error
Expand Down Expand Up @@ -247,47 +251,50 @@ def extract_coordinates(self) -> Coordinate | None:
Returns:
Coordinate | None: the extracted coordinates (if any)
"""
text = ""
for page in self.doc:
text += page.get_text()
text = text.replace("\n", " ")

# Try to get the text by including explicit 'X' and 'Y' labels.
# In this case, we can allow for some whitespace in between the numbers.
# In some older borehole profile the OCR may recognize whitespace between two digits.
x_values = [int("".join(groups)) for groups in regex.findall(r"X[=:\s]{0,3}" + COORDINATE_ENTRY_REGEX, text)]
y_values = [int("".join(groups)) for groups in regex.findall(r"Y[=:\s]{0,3}" + COORDINATE_ENTRY_REGEX, text)]
# We are only checking the 1st x-value with the 1st y-value, the 2nd x-value with the 2nd y-value, etc.
# In some edge cases, the matched x_values and y-values might not be aligned / equal in number. However,
# we ignore this for now, as almost always, the 1st x and y values are already the ones that we are looking
# for.
coordinate_values = list(zip(x_values, y_values, strict=False))

if len(coordinate_values) == 0:
# get the substring that contains the coordinate information
coord_substring = self.get_coordinate_substring(text)
coordinate_values = self.get_coordinate_pairs(coord_substring)

if len(coordinate_values) == 0:
# if that doesn't work, try to directly detect coordinates in the text
coordinate_values = self.get_coordinate_pairs(text)

if len(coordinate_values) == 0:
logger.info("No coordinates found in this borehole profile.")
return None

for east, north in coordinate_values:
if east > 1e6 and north > 1e6:
coordinate = LV95Coordinate(
CoordinateEntry(east),
CoordinateEntry(north),
)
else:
coordinate = LV03Coordinate(
CoordinateEntry(east),
CoordinateEntry(north),
)
if coordinate.is_valid():
return coordinate

logger.warning(f"Could not extract valid coordinates from {coordinate_values}")
text = page.get_text()
text = text.replace("\n", " ")

lines = extract_text_lines(page)

# Try to get the text by including explicit 'X' and 'Y' labels.
# In this case, we can allow for some whitespace in between the numbers.
# In some older borehole profile the OCR may recognize whitespace between two digits.
x_values = [
int("".join(groups)) for groups in regex.findall(r"X[=:\s]{0,3}" + COORDINATE_ENTRY_REGEX, text)
]
y_values = [
int("".join(groups)) for groups in regex.findall(r"Y[=:\s]{0,3}" + COORDINATE_ENTRY_REGEX, text)
]
# We are only checking the 1st x-value with the 1st y-value, the 2nd x-value with the 2nd y-value, etc.
# In some edge cases, the matched x_values and y-values might not be aligned / equal in number. However,
# we ignore this for now, as almost always, the 1st x and y values are already the ones that we are looking
# for.
coordinate_values = list(zip(x_values, y_values, strict=False))

if len(coordinate_values) == 0:
# get the substring that contains the coordinate information
coord_substring = self.get_coordinate_substring(lines, page.rect.width)
coordinate_values = self.get_coordinate_pairs(coord_substring)

if len(coordinate_values) == 0:
# if that doesn't work, try to directly detect coordinates in the text
coordinate_values = self.get_coordinate_pairs(text)

for east, north in coordinate_values:
if east > 1e6 and north > 1e6:
coordinate = LV95Coordinate(
CoordinateEntry(east),
CoordinateEntry(north),
)
else:
coordinate = LV03Coordinate(
CoordinateEntry(east),
CoordinateEntry(north),
)
if coordinate.is_valid():
return coordinate

logger.warning(f"Could not extract valid coordinates from {coordinate_values}")

logger.info("No coordinates found in this borehole profile.")
45 changes: 45 additions & 0 deletions src/stratigraphy/util/extract_text.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Methods for extracting plain text from a PDF document."""

import fitz

from stratigraphy.util.line import TextLine, TextWord


def extract_text_lines(page: fitz.Page) -> list[TextLine]:
"""Extract all text lines from the page.
Sometimes, a single lines as identified by PyMuPDF, is still split into separate lines.
Args:
page (fitz.page): the page to extract text from
Returns:
list[TextLine]: A list of text lines.
"""
words = []
words_by_line = {}
for x0, y0, x1, y1, word, block_no, line_no, _word_no in fitz.utils.get_text(page, "words"):
rect = fitz.Rect(x0, y0, x1, y1) * page.rotation_matrix
text_word = TextWord(rect, word)
words.append(text_word)
key = f"{block_no}_{line_no}"
if key not in words_by_line:
words_by_line[key] = []
words_by_line[key].append(text_word)

raw_lines = [TextLine(words_by_line[key]) for key in words_by_line]

lines = []
current_line_words = []
for line_index, raw_line in enumerate(raw_lines):
for word_index, word in enumerate(raw_line.words):
remaining_line = TextLine(raw_line.words[word_index:])
if len(current_line_words) > 0 and remaining_line.is_line_start(lines, raw_lines[line_index + 1 :]):
lines.append(TextLine(current_line_words))
current_line_words = []
current_line_words.append(word)
if len(current_line_words):
lines.append(TextLine(current_line_words))
current_line_words = []

return lines

0 comments on commit 43338f5

Please sign in to comment.