Skip to content

Commit

Permalink
simplify significant_arithmetic_progression + create BoundaryDepthCol…
Browse files Browse the repository at this point in the history
…umnValidator class
  • Loading branch information
stijnvermeeren-swisstopo committed Jun 21, 2024
1 parent ee459a5 commit d37c03b
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 152 deletions.
135 changes: 135 additions & 0 deletions src/stratigraphy/util/boundarydepthcolumnvalidator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""This module contains logic to validate BoundaryDepthColumn instances."""

import dataclasses

import numpy as np

from stratigraphy.util.depthcolumn import BoundaryDepthColumn
from stratigraphy.util.depthcolumnentry import DepthColumnEntry
from stratigraphy.util.line import TextWord


@dataclasses.dataclass
class BoundaryDepthColumnValidator:
"""Validation logic for instances of the BoundaryDepthColumn class.
Args:
all_words (list[TextLine]): A list of all text lines on the page.
noise_count_threshold (float): Noise count threshold deciding how much noise is allowed in a column
to be valid.
noise_count_offset (int): Offset for the noise count threshold. Affects the noise count criterion.
Effective specifically for depth columns with very few entries.
"""

all_words: list[TextWord]
noise_count_threshold: float
noise_count_offset: int

def is_valid(self, column: BoundaryDepthColumn) -> bool:
"""Checks whether the depth column is valid.
The depth column is considered valid if:
- The number of entries is at least 3.
- The number of words that intersect with the depth column entries is less than the noise count threshold
time the number of entries minus the noise count offset.
- The entries are strictly increasing.
- The entries are linearly correlated with their vertical position.
Note: The noise count criteria may require a rehaul. Some depth columns are not recognized as valid
even though they are.
Args:
column (BoundaryDepthColumn): The depth column to validate.
Returns:
bool: True if the depth column is valid, False otherwise.
"""
if len(column.entries) < 3:
return False

# When too much other text is in the column, then it is probably not valid.
# The quadratic behavior of the noise count check makes the check stricter for columns with few entries
# than columns with more entries. The more entries we have, the less likely it is that we found them by chance.
# TODO: Once evaluation data is of good enough qualities, we should optimize for the parameter below.
if (
column.noise_count(self.all_words)
> self.noise_count_threshold * (len(column.entries) - self.noise_count_offset) ** 2
):
return False
# Check if the entries are strictly increasing.
if not all(i.value < j.value for i, j in zip(column.entries, column.entries[1:], strict=False)):
return False

corr_coef = column.pearson_correlation_coef()

return (
corr_coef and corr_coef > np.min([1.0382 - len(column.entries) * 0.01, 0.9985]) and corr_coef > 0.95
) # Magic numbers obtained using an error analysis on critical borehole profiles. Admittedly, this may
# be overfitted to the borehole profiles present.

def reduce_until_valid(self, column: BoundaryDepthColumn) -> BoundaryDepthColumn:
"""Removes entries from the depth column until it fullfills the is_valid condition.
is_valid checks whether there is too much noise (i.e. other text) in the column and whether the entries are
linearly correlated with their vertical position.
Args:
column (BoundaryDepthColumn): The depth column to validate
Returns:
BoundaryDepthColumn: The current depth column with entries removed until it is valid.
"""
while column:
if self.is_valid(column):
return column
elif self.correct_OCR_mistakes(column) is not None:
return self.correct_OCR_mistakes(column)
else:
column = column.remove_entry_by_correlation_gradient()

def correct_OCR_mistakes(self, column: BoundaryDepthColumn) -> BoundaryDepthColumn | None:
"""Corrects OCR mistakes in the depth column entries.
Loops through all values and corrects common OCR mistakes for the given entry. Then, the column with the
highest pearson correlation coefficient is selected and checked for validity.
This is useful if one entry has an OCR mistake, and the column is not valid because of it.
Note: Common mistakes should be extended as needed.
Args:
column (BoundaryDepthColumn): The depth column to validate
Returns:
BoundaryDepthColumn | None: The corrected depth column, or None if no correction was possible.
"""
new_columns = []
for remove_index in range(len(column.entries)):
new_columns.append(
BoundaryDepthColumn(
[
entry if index != remove_index else _correct_entry(entry)
for index, entry in enumerate(column.entries)
],
),
)
best_column = max(new_columns, key=lambda column: column.pearson_correlation_coef())

if self.is_valid(best_column):
return best_column
else:
return None


def _correct_entry(entry: DepthColumnEntry) -> DepthColumnEntry:
"""Corrects frequent OCR errors in depth column entries.
Args:
entry (DepthColumnEntry): The depth column entry to correct.
Returns:
DepthColumnEntry: The corrected depth column entry.
"""
text_value = str(entry.value)
text_value = text_value.replace("4", "1") # In older documents, OCR sometimes mistakes 1 for 4
return DepthColumnEntry(entry.rect, float(text_value))
168 changes: 19 additions & 149 deletions src/stratigraphy/util/depthcolumn.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from stratigraphy.util.depthcolumnentry import DepthColumnEntry, LayerDepthColumnEntry
from stratigraphy.util.find_description import get_description_blocks
from stratigraphy.util.interval import BoundaryInterval, Interval, LayerInterval
from stratigraphy.util.line import TextLine
from stratigraphy.util.line import TextLine, TextWord


class DepthColumn(metaclass=abc.ABCMeta):
Expand Down Expand Up @@ -47,7 +47,7 @@ def min_x1(self) -> float:
return min([rect.x1 for rect in self.rects()])

@abc.abstractmethod
def noise_count(self, all_words: list[TextLine]) -> int:
def noise_count(self, all_words: list[TextWord]) -> int:
pass

@abc.abstractmethod
Expand Down Expand Up @@ -97,7 +97,7 @@ def depth_intervals(self) -> list[Interval]:
def rects(self) -> list[fitz.Rect]:
return [entry.rect for entry in self.entries]

def noise_count(self, all_words: list[TextLine]) -> int:
def noise_count(self, all_words: list[TextWord]) -> int:
# currently, we don't count noise for layer columns
return 0

Expand Down Expand Up @@ -172,21 +172,14 @@ class BoundaryDepthColumn(DepthColumn):

entries: list[DepthColumnEntry]

def __init__(self, noise_count_threshold: float, noise_count_offset: int, entries: list = None):
def __init__(self, entries: list = None):
"""Initializes a BoundaryDepthColumn object.
Args:
noise_count_threshold (float): Noise count threshold deciding how much noise is allowed in a column
to be valid.
noise_count_offset (int): Offset for the noise count threshold. Affects the noise count criterion.
Effective specifically for depth columns with very few entries.
entries (list, optional): Depth Column Entries for the depth column. Defaults to None.
"""
super().__init__()

self.noise_count_threshold = noise_count_threshold
self.noise_count_offset = noise_count_offset

if entries is not None:
self.entries = entries
else:
Expand Down Expand Up @@ -231,12 +224,10 @@ def can_be_appended(self, rect: fitz.Rect) -> bool:

def valid_initial_segment(self, rect: fitz.Rect) -> BoundaryDepthColumn:
for i in range(len(self.entries) - 1):
initial_segment = BoundaryDepthColumn(
self.noise_count_threshold, self.noise_count_offset, self.entries[: -i - 1]
)
initial_segment = BoundaryDepthColumn(self.entries[: -i - 1])
if initial_segment.can_be_appended(rect):
return initial_segment
return BoundaryDepthColumn(self.noise_count_threshold, self.noise_count_offset)
return BoundaryDepthColumn()

def strictly_contains(self, other: BoundaryDepthColumn):
return len(other.entries) < len(self.entries) and all(
Expand All @@ -262,15 +253,14 @@ def depth_intervals(self) -> list[BoundaryInterval]:
return depth_intervals

def significant_arithmetic_progression(self) -> bool:
if len(self.entries) < 7:
# to allow for OCR errors or gaps in the progression, we only require a segment of length 6 that is an
# arithmetic progression
segment_length = 6
if len(self.entries) < segment_length:
return self.is_arithmetic_progression()
else:
# to allow for OCR errors or gaps in the progression, we only require a segment of length 7 that is an
# arithmetic progression
for i in range(len(self.entries) - 7 + 1):
if BoundaryDepthColumn(
self.noise_count_threshold, self.noise_count_offset, self.entries[i : i + 7]
).is_arithmetic_progression():
for i in range(len(self.entries) - segment_length + 1):
if BoundaryDepthColumn(self.entries[i : i + segment_length]).is_arithmetic_progression():
return True
return False

Expand All @@ -286,59 +276,15 @@ def is_arithmetic_progression(self) -> bool:
return False

scale_pearson_correlation_coef = np.corrcoef(entries, progression)[0, 1].item()
if len(self.entries) < 6: # It is more likely that fewer entries are accidently very much correlated
return abs(scale_pearson_correlation_coef) >= 0.9999
else:
return abs(scale_pearson_correlation_coef) >= 0.999

def is_valid(self, all_words: list[TextLine]) -> bool:
"""Checks whether the depth column is valid.
The depth column is considered valid if:
- The number of entries is at least 3.
- The number of words that intersect with the depth column entries is less than the noise count threshold
time the number of entries minus the noise count offset.
- The entries are linearly correlated with their vertical position.
Note: The noise count criteria may require a rehaul. Some depth columns are not recognized as valid
even though they are.
Args:
all_words (list[TextLine]): A list of all text lines on the page.
Returns:
bool: True if the depth column is valid, False otherwise.
"""
if len(self.entries) < 3:
return False

# When too much other text is in the column, then it is probably not valid.
# The quadratic behavior of the noise count check makes the check strictoer for columns with few entries
# than columns with more entries. The more entries we have, the less likely it is that we found them by chance.
# TODO: Once evaluation data is of good enough qualities, we should optimize for the parameter below.
if (
self.noise_count(all_words)
> self.noise_count_threshold * (len(self.entries) - self.noise_count_offset) ** 2
):
return False
# Check if the entries are strictly increasing.
if not all(i.value < j.value for i, j in zip(self.entries, self.entries[1:], strict=False)):
return False

corr_coef = self.pearson_correlation_coef()
return abs(scale_pearson_correlation_coef) >= 0.9999

return (
corr_coef and corr_coef > np.min([1.0382 - len(self.entries) * 0.01, 0.9985]) and corr_coef > 0.95
) # Magic numbers obtained using an error analysis on critical borehole profiles. Admittedly, this may
# be overfitted to the borehole profiles present.

def noise_count(self, all_words: list[TextLine]) -> int:
def noise_count(self, all_words: list[TextWord]) -> int:
"""Counts the number of words that intersect with the depth column entries.
Returns the number of words that intersect with the depth column entries, but are not part of the depth column.
Args:
all_words (list[TextLine]): A list of all text lines on the page.
all_words (list[TextWord]): A list of all text lines on the page.
Returns:
int: The number of words that intersect with the depth column entries but are not part of it.
Expand All @@ -360,72 +306,12 @@ def pearson_correlation_coef(self) -> float:

return np.corrcoef(positions, entries)[0, 1].item()

def reduce_until_valid(self, all_words: list[TextLine]) -> BoundaryDepthColumn:
"""Removes entries from the depth column until it is fullfills the is_valid condition.
is_valid checks whether there is too much noise (i.e. other text) in the column and whether the entries are
linearly correlated with their vertical position.
Args:
all_words (list[TextLine]): A list of all text lines on the page.
Returns:
BoundaryDepthColumn: The current depth column with entries removed until it is valid.
"""
current = self
while current:
if current.is_valid(all_words):
return current
elif current.correct_OCR_mistakes(all_words) is not None:
return current.correct_OCR_mistakes(all_words)
else:
current = current.remove_entry_by_correlation_gradient()

def correct_OCR_mistakes(self, all_words: list[TextLine]) -> BoundaryDepthColumn | None:
"""Corrects OCR mistakes in the depth column entries.
Loops through all values and corrects common OCR mistakes for the given entry. Then, the column with the
hightest pearson correlation coefficient is selected and checked for validity.
This is useful if one entry has an OCR mistake, and the column is not valid because of it.
Note: Common mistakes should be extended as needed.
Args:
all_words (list[TextLine]): A list of all text lines on the page.
Returns:
BoundaryDepthColumn | None: The corrected depth column, or None if no correction was possible.
"""
new_columns = []
for remove_index in range(len(self.entries)):
new_columns.append(
BoundaryDepthColumn(
self.noise_count_threshold,
self.noise_count_offset,
[
entry if index != remove_index else _correct_entry(entry)
for index, entry in enumerate(self.entries)
],
),
)
best_column = max(new_columns, key=lambda column: column.pearson_correlation_coef())

if best_column.is_valid(all_words):
return best_column
else:
return None

def remove_entry_by_correlation_gradient(self) -> BoundaryDepthColumn | None:
if len(self.entries) < 3:
return None

new_columns = [
BoundaryDepthColumn(
self.noise_count_threshold,
self.noise_count_offset,
[entry for index, entry in enumerate(self.entries) if index != remove_index],
)
BoundaryDepthColumn([entry for index, entry in enumerate(self.entries) if index != remove_index])
for remove_index in range(len(self.entries))
]
return max(new_columns, key=lambda column: column.pearson_correlation_coef())
Expand All @@ -450,9 +336,7 @@ def break_on_double_descending(self) -> list[BoundaryDepthColumn]:
if len(final_segment):
segments.append(final_segment)

return [
BoundaryDepthColumn(self.noise_count_threshold, self.noise_count_offset, segment) for segment in segments
]
return [BoundaryDepthColumn(segment) for segment in segments]

def identify_groups(
self,
Expand Down Expand Up @@ -523,26 +407,12 @@ def identify_groups(
current_blocks = post
current_intervals = []
else:
# The final open ended interval should not be added, since borehole profiles do typically not come
# with open ended intervals.
# The final open-ended interval should not be added, since borehole profiles do typically not come
# with open-ended intervals.
if interval.end is not None:
current_intervals.append(interval)

if len(current_intervals) > 0 or len(current_blocks) > 0:
groups.append({"depth_intervals": current_intervals, "blocks": current_blocks})

return groups


def _correct_entry(entry: DepthColumnEntry) -> DepthColumnEntry:
"""Corrects frequent OCR errors in depth column entries.
Args:
entry (DepthColumnEntry): The depth column entry to correct.
Returns:
DepthColumnEntry: The corrected depth column entry.
"""
text_value = str(entry.value)
text_value = text_value.replace("4", "1") # In older documents, OCR sometimes mistakes 1 for 4
return DepthColumnEntry(entry.rect, float(text_value))
Loading

0 comments on commit d37c03b

Please sign in to comment.