diff --git a/CHANGELOG.md b/CHANGELOG.md index 4dbc0bc9..5d8efd65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +# 3.0.2 +- Fixed regression in Exists predicate + # 3.0.1 - Fixed regression in Exists predicate diff --git a/dedupe/labeler.py b/dedupe/labeler.py index bc024140..f6b45bd2 100644 --- a/dedupe/labeler.py +++ b/dedupe/labeler.py @@ -343,23 +343,49 @@ class DisagreementLearner(HasCandidates): def __init__(self) -> None: self.y: numpy.typing.NDArray[numpy.int_] = numpy.array([]) self.pairs: TrainingExamples = [] + self.rng = numpy.random.default_rng() def pop(self) -> TrainingExample: - if not len(self.candidates): + if not (n_candidates := len(self.candidates)): raise IndexError("No more unlabeled examples to label") prob_l = [learner.candidate_scores() for learner in self._learners] probs = numpy.concatenate(prob_l, axis=1) # where do the classifers disagree? - disagreement = numpy.std(probs > 0.5, axis=1).astype(bool) + decisions = probs > 0.5 + uncovered_disagreement = numpy.any(decisions != decisions[:, [0]], axis=1) * ( + probs[:, 1] == 0 + ) - if disagreement.any(): - conflicts = disagreement.nonzero()[0] - target = numpy.random.uniform(size=1) - uncertain_index = conflicts[numpy.argmax(probs[conflicts][:, 0] - target)] + if uncovered_disagreement.any(): + # If there are records that the classifier thinks are + # matches but we are not covering with a blocking rule + # then choose one of those, with the weights + # proportional to the classifier's confidence that it + # is a match. These are the most important to capture + # for the best possible recall. + weights = uncovered_disagreement * probs[:, 0] + weights /= weights.sum() + uncertain_index = self.rng.choice(n_candidates, p=weights) + elif (probs[:, 1] == 1).any(): + # Otherwise, sample from records that are covered, uniformly + # across classifier confidence. + # + # We don't sample uniformly across covered records, because + # negative examples would dominate. + covered = (probs[:, 1] == 1).nonzero()[0] + target = random.random() + uncertain_index = covered[ + numpy.argmin(numpy.absolute(probs[covered, 0] - target)) + ] else: - uncertain_index = numpy.std(probs, axis=1).argmax() + # If there are no uncovered disagreements and no covered pairs, then + # choose a pair using weights related to the disagreement + # between the classifiers + weights = numpy.std(probs, axis=1) + weights /= weights.sum() + uncertain_index = self.rng.choice(n_candidates, p=weights) logger.debug( "Classifier: %.2f, Covered: %s", diff --git a/dedupe/training.py b/dedupe/training.py index 8709fb8e..f5a957e0 100644 --- a/dedupe/training.py +++ b/dedupe/training.py @@ -65,12 +65,22 @@ def learn( logger.debug(uncoverable_dupes) target_cover = len(coverable_dupes) - if candidate_types == "simple": - candidate_cover = self.simple_candidates(match_cover, comparison_cover) - elif candidate_types == "random forest": - candidate_cover = self.random_forest_candidates( - match_cover, comparison_cover - ) + candidate_cover = simple_candidates(match_cover, comparison_cover) + + if candidate_types == "random forest": + # The random forest conjunctions can sometimes not cover + # all the matches, so we always include the simple + # predicates to avoid that coverage loss. + + # To avoid overfitting, we only start to include conjunctions + # as our training data reaches certain sizes + K = max(math.floor(math.log10(len(matches))), 1) + if K > 1: + candidate_cover.update( + random_forest_candidates(match_cover, comparison_cover, K) + ) + elif candidate_types == "simple": + pass else: raise ValueError("candidate_type is not valid") @@ -82,71 +92,6 @@ def learn( return final_predicates - def simple_candidates( - self, match_cover: Cover, comparison_cover: ComparisonCover - ) -> Cover: - candidates = {} - for predicate, coverage in match_cover.items(): - predicate.cover_count = len(comparison_cover[predicate]) - candidates[predicate] = coverage.copy() - - return candidates - - def random_forest_candidates( - self, - match_cover: Cover, - comparison_cover: ComparisonCover, - K: int | None = None, - ) -> Cover: - predicates = list(match_cover) - matches = list(frozenset.union(*match_cover.values())) - pred_sample_size = max(int(math.sqrt(len(predicates))), 5) - candidates = {} - if K is None: - K = max(math.floor(math.log10(len(matches))), 1) - - n_samples = 5000 - for _ in range(n_samples): - sample_predicates = random.sample(predicates, pred_sample_size) - resampler = Resampler(matches) - sample_match_cover = { - pred: resampler(pairs) for pred, pairs in match_cover.items() - } - - # initialize variables that will be - # the base for the constructing k-conjunctions - candidate = None - covered_comparisons: frozenset[RecordIDPair] | InfiniteSet = InfiniteSet() - covered_matches: frozenset[int] | InfiniteSet = InfiniteSet() - covered_sample_matches = InfiniteSet() - - def score(predicate: Predicate) -> float: - try: - return len( - covered_sample_matches & sample_match_cover[predicate] - ) / len(covered_comparisons & comparison_cover[predicate]) - except ZeroDivisionError: - return 0.0 - - for _ in range(K): - next_predicate = max(sample_predicates, key=score) - if candidate: - candidate += next_predicate - else: - candidate = next_predicate - - covered_comparisons &= comparison_cover[next_predicate] - candidate.cover_count = len(covered_comparisons) - - covered_matches &= match_cover[next_predicate] - candidates[candidate] = covered_matches - - covered_sample_matches &= sample_match_cover[next_predicate] - - sample_predicates.remove(next_predicate) - - return candidates - def cover(self, pairs: TrainingExamples, index_predicates: bool = True) -> Cover: predicate_cover = {} if index_predicates: @@ -321,6 +266,68 @@ def coveredPairs(self, blocker, records_1, records_2): return pair_cover +def simple_candidates(match_cover: Cover, comparison_cover: ComparisonCover) -> Cover: + candidates = {} + for predicate, coverage in match_cover.items(): + predicate.cover_count = len(comparison_cover[predicate]) + candidates[predicate] = coverage.copy() + + return candidates + + +def random_forest_candidates( + match_cover: Cover, + comparison_cover: ComparisonCover, + K: int, +) -> Cover: + predicates = list(match_cover) + matches = list(frozenset.union(*match_cover.values())) + pred_sample_size = max(int(math.sqrt(len(predicates))), 5) + candidates = {} + + n_samples = 5000 + for _ in range(n_samples): + sample_predicates = random.sample(predicates, pred_sample_size) + resampler = Resampler(matches) + sample_match_cover = { + pred: resampler(pairs) for pred, pairs in match_cover.items() + } + + # initialize variables that will be + # the base for the constructing k-conjunctions + candidate = None + covered_comparisons: frozenset[RecordIDPair] | InfiniteSet = InfiniteSet() + covered_matches: frozenset[int] | InfiniteSet = InfiniteSet() + covered_sample_matches = InfiniteSet() + + def score(predicate: Predicate) -> float: + try: + return len( + covered_sample_matches & sample_match_cover[predicate] + ) / len(covered_comparisons & comparison_cover[predicate]) + except ZeroDivisionError: + return 0.0 + + for _ in range(K): + next_predicate = max(sample_predicates, key=score) + if candidate: + candidate += next_predicate + else: + candidate = next_predicate + + covered_comparisons &= comparison_cover[next_predicate] + candidate.cover_count = len(covered_comparisons) + + covered_matches &= match_cover[next_predicate] + candidates[candidate] = covered_matches + + covered_sample_matches &= sample_match_cover[next_predicate] + + sample_predicates.remove(next_predicate) + + return candidates + + class InfiniteSet: def __and__(self, item): return item