diff --git a/setup.py b/setup.py index 70cfad8..b6cdca2 100644 --- a/setup.py +++ b/setup.py @@ -7,9 +7,10 @@ 'python-Levenshtein', 'thefuzz', 'modAL', + 'cardinal', 'pytest', 'multipledispatch', - 'dill', + 'dill==0.2', 'graphframes', 'scipy' ] diff --git a/spark_matcher/activelearner/__init__.py b/spark_matcher/activelearner/__init__.py index abfd5f0..645c54c 100644 --- a/spark_matcher/activelearner/__init__.py +++ b/spark_matcher/activelearner/__init__.py @@ -1,3 +1,4 @@ -__all__ = ['ScoringLearner'] +__all__ = ['ConfidenceLearner', 'DiverseMiniBatchLearner'] -from .active_learner import ScoringLearner \ No newline at end of file +from .active_learner import ConfidenceLearner +from .batch_active_learner import DiverseMiniBatchLearner \ No newline at end of file diff --git a/spark_matcher/activelearner/active_learner.py b/spark_matcher/activelearner/active_learner.py index 01b0053..10228ea 100644 --- a/spark_matcher/activelearner/active_learner.py +++ b/spark_matcher/activelearner/active_learner.py @@ -1,18 +1,14 @@ -# Authors: Ahmet Bayraktar -# Stan Leisink -# Frits Hermans - -from typing import List, Optional, Union - +from typing import List, Union import numpy as np import pandas as pd from modAL.models import ActiveLearner from modAL.uncertainty import uncertainty_sampling from pyspark.sql import DataFrame from sklearn.base import BaseEstimator +from spark_matcher.activelearner.active_learner_base import ActiveLearnerBase -class ScoringLearner: +class ConfidenceLearner(ActiveLearnerBase): """ Class to train a string matching model using active learning. Attributes: @@ -32,113 +28,16 @@ def __init__(self, col_names: List[str], scorer: BaseEstimator, min_nr_samples: uncertainty_threshold: float = 0.1, uncertainty_improvement_threshold: float = 0.01, n_uncertainty_improvement: int = 5, n_queries: int = 9999, sampling_method=uncertainty_sampling, verbose: int = 0): - self.col_names = col_names + super().__init__(col_names, min_nr_samples, uncertainty_threshold, uncertainty_improvement_threshold, + n_uncertainty_improvement, verbose) self.learner = ActiveLearner( estimator=scorer, query_strategy=sampling_method ) - self.counter_total = 0 - self.counter_positive = 0 - self.counter_negative = 0 - self.min_nr_samples = min_nr_samples - self.uncertainty_threshold = uncertainty_threshold - self.uncertainty_improvement_threshold = uncertainty_improvement_threshold - self.n_uncertainty_improvement = n_uncertainty_improvement - self.uncertainties = [] self.n_queries = n_queries - self.verbose = verbose - - def _input_assert(self, message: str, choices: List[str]) -> str: - """ - Adds functionality to the python function `input` to limit the choices that can be returned - Args: - message: message to user - choices: list containing possible choices that can be returned - Returns: - input returned by user - """ - output = input(message).lower() - if output not in choices: - print(f"Wrong input! Your input should be one of the following: {', '.join(choices)}") - return self._input_assert(message, choices) - else: - return output - - def _get_uncertainty_improvement(self) -> Optional[float]: - """ - Calculates the uncertainty differences during active learning. The largest difference over the `last_n` - iterations is returned. The aim of this function is to suggest early stopping of active learning. - - Returns: largest uncertainty update in `last_n` iterations - - """ - uncertainties = np.asarray(self.uncertainties) - abs_differences = abs(uncertainties[1:] - uncertainties[:-1]) - return max(abs_differences[-self.n_uncertainty_improvement:]) - - def _is_converged(self) -> bool: - """ - Checks whether the model is converged by comparing the last uncertainty value with the `uncertainty_threshold` - and comparing the `last_n` uncertainty improvements with the `uncertainty_improvement_threshold`. These checks - are only performed if at least `min_nr_samples` are labelled. - - Returns: - boolean indicating whether the model is converged - - """ - if (self.counter_total >= self.min_nr_samples) and ( - len(self.uncertainties) >= self.n_uncertainty_improvement + 1): - uncertainty_improvement = self._get_uncertainty_improvement() - if (self.uncertainties[-1] <= self.uncertainty_threshold) or ( - uncertainty_improvement <= self.uncertainty_improvement_threshold): - return True - else: - return False - def _get_active_learning_input(self, query_inst: pd.DataFrame) -> np.ndarray: - """ - Obtain user input for a query during active learning. - Args: - query_inst: query as provided by the ActiveLearner instance - Returns: label of user input '1' or '0' as yes or no - 'p' to go to previous - 'f' to finish - 's' to skip the query - """ - print(f'\nNr. {self.counter_total + 1} ({self.counter_positive}+/{self.counter_negative}-)') - print("Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish") - print('') - for element in [1, 2]: - for col_name in self.col_names: - print(f'{col_name}_{element}' + ': ' + query_inst[f'{col_name}_{element}'].iloc[0]) - print('') - user_input = self._input_assert("", ['y', 'n', 'p', 'f', 's']) - # replace 'y' and 'n' with '1' and '0' to make them valid y labels - user_input = user_input.replace('y', '1').replace('n', '0') - - y_new = np.array([user_input]) - return y_new - - def _calculate_uncertainty(self, x) -> None: - # take the maximum probability of the predicted classes as proxy of the confidence of the classifier - confidence = self.predict_proba(x).max(axis=1)[0] - if self.verbose: - print('uncertainty:', 1 - confidence) - self.uncertainties.append(1 - confidence) - - def _show_min_max_scores(self, X: pd.DataFrame) -> None: - """ - Prints the lowest and the highest logistic regression scores on train data during active learning. - - Args: - X: Pandas dataframe containing train data that is available for labelling duringg active learning - """ - X_all = pd.concat((X, self.train_samples)) - pred_max = self.learner.predict_proba(np.array(X_all['similarity_metrics'].tolist())).max(axis=0) - print(f'lowest score: {1 - pred_max[0]:.3f}') - print(f'highest score: {pred_max[1]:.3f}') - def _label_perfect_train_matches(self, identical_records: pd.DataFrame) -> None: + def label_perfect_train_matches(self, identical_records: pd.DataFrame) -> None: """ To prevent asking labels for the perfect matches that were created by setting `n_perfect_train_matches`, these are provided to the active learner upfront. @@ -152,7 +51,7 @@ def _label_perfect_train_matches(self, identical_records: pd.DataFrame) -> None: identical_records['y'].values) self.train_samples = pd.concat([self.train_samples, identical_records]) - def fit(self, X: pd.DataFrame) -> 'ScoringLearner': + def fit(self, X: pd.DataFrame) -> 'ConfidenceLearner': """ Fit ScoringLearner instance on pairs of strings Args: @@ -163,22 +62,20 @@ def fit(self, X: pd.DataFrame) -> 'ScoringLearner': # automatically label all perfect train matches: identical_records = X[X['perfect_train_match']].copy() - self._label_perfect_train_matches(identical_records) - X = X.drop(identical_records.index).reset_index(drop=True) # remove identical records to avoid double labelling - - for i in range(self.n_queries): + self.label_perfect_train_matches(identical_records) + # remove identical records to avoid double labelling + X = X.drop(identical_records.index).reset_index(drop=True) + for _ in range(self.n_queries): query_idx, query_inst = self.learner.query(np.array(X['similarity_metrics'].tolist())) - if self.learner.estimator.fitted_: # the uncertainty calculations need a fitted estimator # however it can occur that the estimator can only be fit after a couple rounds of querying - self._calculate_uncertainty(query_inst) + self.calculate_uncertainty(query_inst) if self.verbose >= 2: - self._show_min_max_scores(X) - - y_new = self._get_active_learning_input(X.iloc[query_idx]) + self.show_min_max_scores(X) + y_new = self.get_active_learning_input(X.iloc[query_idx].iloc[0]) if y_new == 'p': # use previous (input is 'p') - y_new = self._get_active_learning_input(query_inst_prev) + y_new = self.get_active_learning_input(query_inst_prev.iloc[0]) elif y_new == 'f': # finish labelling (input is 'f') break query_inst_prev = X.iloc[query_idx] @@ -187,12 +84,9 @@ def fit(self, X: pd.DataFrame) -> 'ScoringLearner': train_sample_to_add = X.iloc[query_idx].copy() train_sample_to_add['y'] = y_new self.train_samples = pd.concat([self.train_samples, train_sample_to_add]) - X = X.drop(query_idx).reset_index(drop=True) - - if self._is_converged(): + if self.is_converged(): print("Classifier converged, enter 'f' to stop training") - if y_new == '1': self.counter_positive += 1 elif y_new == '0': @@ -207,4 +101,4 @@ def predict_proba(self, X: Union[DataFrame, pd.DataFrame]) -> Union[DataFrame, p X: Pandas or Spark dataframe to predict on Returns: match probabilities """ - return self.learner.estimator.predict_proba(X) + return self.learner.estimator.predict_proba(X) \ No newline at end of file diff --git a/spark_matcher/activelearner/active_learner_base.py b/spark_matcher/activelearner/active_learner_base.py new file mode 100644 index 0000000..465bc78 --- /dev/null +++ b/spark_matcher/activelearner/active_learner_base.py @@ -0,0 +1,167 @@ +from typing import List, Optional +from abc import ABC, abstractmethod +import numpy as np +import pandas as pd + + +class ActiveLearnerBase(ABC): + """ + A base class for active learning approaches. + Attributes: + col_names: column names used for matching + scorer: the scorer to be used in the active learning loop + min_nr_samples: minimum number of responses required before classifier convergence is tested + uncertainty_threshold: threshold on the uncertainty of the classifier during active learning, + used for determining if the model has converged + uncertainty_improvement_threshold: threshold on the uncertainty improvement of classifier during active + learning, used for determining if the model has converged + n_uncertainty_improvement: span of iterations to check for largest difference between uncertainties + verbose: sets verbosity + """ + def __init__(self, col_names: List[str], min_nr_samples: int = 10, + uncertainty_threshold: float = 0.1, uncertainty_improvement_threshold: float = 0.01, + n_uncertainty_improvement: int = 5, verbose: int = 0): + self.col_names = col_names + self.counter_total = 0 + self.counter_positive = 0 + self.counter_negative = 0 + self.min_nr_samples = min_nr_samples + self.uncertainty_threshold = uncertainty_threshold + self.uncertainty_improvement_threshold = uncertainty_improvement_threshold + self.n_uncertainty_improvement = n_uncertainty_improvement + self.uncertainties = [] + self.train_samples = pd.DataFrame([]) + self.verbose = verbose + + def input_assert(self, message: str, choices: List[str]) -> str: + """ + Adds functionality to the python function `input` to limit the choices that can be returned + Args: + message: message to user + choices: list containing possible choices that can be returned + Returns: + input returned by user + """ + output = input(message).lower() + if output not in choices: + print(f"Wrong input! Your input should be one of the following: {', '.join(choices)}") + return self.input_assert(message, choices) + return output + + def get_uncertainty_improvement(self) -> Optional[float]: + """ + Calculates the uncertainty differences during active learning. The largest difference over the `last_n` + iterations is returned. The aim of this function is to suggest early stopping of active learning. + + Returns: largest uncertainty update in `last_n` iterations + + """ + uncertainties = np.asarray(self.uncertainties) + abs_differences = abs(uncertainties[1:] - uncertainties[:-1]) + return max(abs_differences[-self.n_uncertainty_improvement:]) + + def is_converged(self) -> bool: + """ + Checks whether the model is converged by comparing the last uncertainty value with the `uncertainty_threshold` + and comparing the `last_n` uncertainty improvements with the `uncertainty_improvement_threshold`. These checks + are only performed if at least `min_nr_samples` are labelled. + + Returns: + boolean indicating whether the model is converged + + """ + if (self.counter_total >= self.min_nr_samples) and ( + len(self.uncertainties) >= self.n_uncertainty_improvement + 1): + uncertainty_improvement = self.get_uncertainty_improvement() + if (self.uncertainties[-1] <= self.uncertainty_threshold) or ( + uncertainty_improvement <= self.uncertainty_improvement_threshold): + return True + else: + return False + + def get_active_learning_input(self, x: pd.Series) -> np.ndarray: + """ + Obtain user input for a query during active learning. + Args: + x: query as provided by the ActiveLearner instance + Returns: label of user input '1' or '0' as yes or no + 'p' to go to previous + 'f' to finish + 's' to skip the query + """ + print(f'\nNr. {self.counter_total + 1} ({self.counter_positive}+/{self.counter_negative}-)') + print("Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish") + print(' ') + for element in [1, 2]: + for col_name in self.col_names: + print(f'{col_name}_{element}' + ': ' + x[f'{col_name}_{element}']) + print('') + user_input = self.input_assert("", choices = ['y', 'n', 'p', 'f', 's']) + # replace 'y' and 'n' with '1' and '0' to make them valid y labels + user_input = user_input.replace('y', '1').replace('n', '0') + y_new = np.array([user_input]) + return y_new + + def _batch_uncertainty(self, x: np.ndarray) -> None: + """ + This function calculates average of uncertainty with lower/upper confidence level for a given batch of data + """ + classwise_certainty = self.predict_proba(x) + uncertainty = 1 - np.max(classwise_certainty, axis=1) + idx = np.arange(uncertainty.shape[0]) + rng = np.random.RandomState(seed=1234) + samples_uncertainty = [] + for _ in range(200): + pred_idx = rng.choice(idx, size=idx.shape[0], replace=True) + uncertainty_boot = np.mean(uncertainty[pred_idx]) + samples_uncertainty.append(uncertainty_boot) + bootstrap_mean = np.mean(samples_uncertainty) + ci_lower = np.percentile(samples_uncertainty, 2.5) + ci_upper = np.percentile(samples_uncertainty, 97.5) + if self.verbose: + print(f"""The average uncertainty of model for given batch is {round(bootstrap_mean, ndigits=3)} + with lower and upper confidence of [{round(ci_lower, ndigits=3)}, {round(ci_upper, ndigits=3)}].""") + self.uncertainties.append(round(bootstrap_mean, ndigits=3)) + + def calculate_uncertainty(self, x: np.ndarray) -> None: + # take the maximum probability of the predicted classes as proxy of the confidence of the classifier + if x.shape[0] > 1 : + self._batch_uncertainty(x) + else: + confidence = self.predict_proba(x).max(axis=1)[0] + if self.verbose: + print('The uncertainty of selected sample is:', round(1 - confidence, ndigits=3)) + self.uncertainties.append(round(1 - confidence, ndigits=3)) + + def show_min_max_scores(self, X: pd.DataFrame) -> None: + """ + Prints the lowest and the highest logistic regression scores on train data during active learning. + + Args: + X: Pandas dataframe containing train data that is available for labelling duringg active learning + """ + X_all = pd.concat((X, self.train_samples)) + pred_max = self.predict_proba(np.array(X_all['similarity_metrics'].tolist())).max(axis=0) + print(f"""The lowest and highest score of model for the entire dataset are : + [{1 - pred_max[0]:.3f}, {pred_max[1]:.3f}]""") + + @abstractmethod + def label_perfect_train_matches(self, *args, **kwargs) -> None: + """ + To prevent asking labels for the perfect matches, this function provide them to the active learner upfront. + """ + pass + + @abstractmethod + def fit(self, *args, **kwargs): + """ + fit the active learner instance on data + """ + pass + + @abstractmethod + def predict_proba(self, *args, **kwargs): + """ + predict results using trained model + """ + pass \ No newline at end of file diff --git a/spark_matcher/activelearner/batch_active_learner.py b/spark_matcher/activelearner/batch_active_learner.py new file mode 100644 index 0000000..612e1cc --- /dev/null +++ b/spark_matcher/activelearner/batch_active_learner.py @@ -0,0 +1,155 @@ +from typing import List, Union +from cardinal.zhdanov2019 import TwoStepKMeansSampler +import numpy as np +import pandas as pd +from sklearn.base import BaseEstimator +from spark_matcher.activelearner.active_learner_base import ActiveLearnerBase + + +class TwoStepKMeansSamplerExtended(TwoStepKMeansSampler): + """Extends TwoStepKMeansSampler class to include uncertainty score + """ + def __init__(self, beta: int, classifier, batch_size: int, + assume_fitted: bool = False, verbose: int = 0, **kmeans_args): + super().__init__(beta, classifier, batch_size, assume_fitted, verbose, **kmeans_args) + self.estimator = self.sampler_list[0].classifier_ + + +class DiverseMiniBatchLearner(ActiveLearnerBase): + """ + Class to train a string matching model using active learning. + Attributes: + col_names: column names used for matching + scorer: the scorer to be used in the active learning loop + batch_size: the size of batch for labeling + min_nr_batch: minimum number of batch required before classifier convergence is tested + uncertainty_threshold: threshold on the uncertainty of the classifier during active learning, + used for determining if the model has converged + uncertainty_improvement_threshold: threshold on the uncertainty improvement of classifier during active + learning, used for determining if the model has converged + n_uncertainty_improvement: span of iterations to check for largest difference between uncertainties + n_queries: maximum number of iterations to be done for the active learning session + verbose: sets verbosity + """ + def __init__(self, col_names: List[str], scorer: BaseEstimator, beta: int = 5, + batch_size: int = 5, min_nr_batch: int = 2, + uncertainty_threshold: float = 0.2, uncertainty_improvement_threshold: float = 0.01, + n_uncertainty_improvement: int = 1, verbose: int = 0 + ): + min_nr_samples = min_nr_batch * batch_size + super().__init__(col_names, min_nr_samples, uncertainty_threshold, uncertainty_improvement_threshold, + n_uncertainty_improvement, verbose + ) + self.batch_size = batch_size + self.learner = TwoStepKMeansSamplerExtended(beta, + classifier=scorer, + batch_size=batch_size + ) + + def _process_input_batch(self, query_inst: pd.DataFrame) -> dict: + """ + Process user input for a give sample of data + Args: + query_inst (pd.DataFrame): sample of data to be labelled by user + + Returns: + dict: label of user input as a dict, for example {'index number': '1'} + """ + # checking after each batch if the model is converged + if self.is_converged(): + print("Classifier converged, enter 'f' to stop training") + + y_new = {} + for index, row in query_inst.iterrows(): + user_input = self.get_active_learning_input(row) + + if user_input == 'p': + if y_new: + prev_index = list(y_new.keys())[-1] + user_input_prev = self.get_active_learning_input(query_inst.loc[prev_index]) + y_new[prev_index] = user_input_prev + else: + print('Model is already trained on previous batch') + # asking again the current sample to be labeled + user_input = self.get_active_learning_input(row)[0] + # set up a counter + if user_input == '1': + self.counter_positive += 1 + elif user_input == '0': + self.counter_negative += 1 + elif user_input == 'f': + y_new[index] = user_input + return y_new + self.counter_total += 1 + y_new[index] = user_input + return y_new + + def label_perfect_train_matches(self, identical_records: pd.DataFrame) -> None: + """ + To prevent asking labels for the perfect matches that were created by setting `n_perfect_train_matches`, these + are provided to the active learner upfront. + + Args: + identical_records: Pandas dataframe containing perfect matches + + """ + identical_records['y'] = '1' + # adding one negative sample to train the model on both classes + n_feature = len(identical_records['similarity_metrics'].values.tolist()[0]) + x_perfect = np.append(np.array(identical_records['similarity_metrics'].values.tolist()) + ,np.zeros((1,n_feature)), axis=0) + y_perfect = np.append(identical_records['y'].values, np.array(['0']), axis=0) + + # fitting the learner + self.learner.fit(x_perfect, y_perfect) + self.train_samples = pd.concat([self.train_samples, identical_records]) + + def fit(self, X: pd.DataFrame) -> 'DiverseMiniBatchLearner': + """ + Fit ScoringLearner instance on pairs of strings + Args: + X: Pandas dataframe containing pairs of strings and distance metrics of paired strings + """ + # automatically label all perfect train matches: + identical_records = X[X['perfect_train_match']].copy() + self.label_perfect_train_matches(identical_records) + # remove identical records to avoid double labelling + X = X.drop(identical_records.index).reset_index(drop=True) + # number of iterations over batches + n_iter = int(X.shape[0] // self.batch_size) + for i in range(n_iter): + # selecting first batch from the pool + query_index = self.learner.select_samples(np.array(X['similarity_metrics'].tolist())) + # before labeling, insights about what is the current uncertanty and min(max) of prediction + if self.verbose >= 2: + self.calculate_uncertainty(np.array(X.iloc[query_index]['similarity_metrics'].tolist())) + self.show_min_max_scores(X) + # labeling the selected batch from pool + y_new = self._process_input_batch(X.iloc[query_index]) + # if users decides to finish labeling + if 'f' in y_new.values(): + break + # processing labelled samples and removing 's' ones or 'p' + removed_skipped_feedback = {key:value[0] for key, value in y_new.items() + if (value !='s') and (value !='p')} + train_sample_to_add = X.iloc[[*removed_skipped_feedback]].copy() + train_sample_to_add['y'] = np.array(list(removed_skipped_feedback.values())) + self.train_samples = pd.concat([self.train_samples, train_sample_to_add]) + # update the pool by removing already labeled batch + X = X.drop([*removed_skipped_feedback]).reset_index(drop=True) + # training the model with new labeled data + self.learner.fit( + np.array(self.train_samples['similarity_metrics'].values.tolist()), + self.train_samples['y'].values) + if self.verbose >= 2: + print(f"The batch number {i+1} for labeling is done.") + return self + + def predict_proba(self, X: Union[pd.DataFrame, pd.DataFrame]) -> Union[pd.DataFrame, pd.DataFrame]: + """ + Predict probabilities on new data whether the pairs are a match or not + Args: + X: Pandas or Spark dataframe to predict on + Returns: match probabilities + """ + return self.learner.sampler_list[0].classifier_.predict_proba(X) \ No newline at end of file diff --git a/spark_matcher/deduplicator/deduplicator.py b/spark_matcher/deduplicator/deduplicator.py index 33dd8c1..322eb91 100644 --- a/spark_matcher/deduplicator/deduplicator.py +++ b/spark_matcher/deduplicator/deduplicator.py @@ -3,10 +3,8 @@ # Frits Hermans from typing import Optional, List, Dict - from pyspark.sql import DataFrame, SparkSession, functions as F, types as T from sklearn.exceptions import NotFittedError - from spark_matcher.blocker.blocking_rules import BlockingRule from spark_matcher.deduplicator.connected_components_calculator import ConnectedComponentsCalculator from spark_matcher.deduplicator.hierarchical_clustering import apply_deduplication @@ -42,6 +40,8 @@ class Deduplicator(MatchingBase): ratio_hashed_samples: ratio of hashed samples to be created for training, rest is sampled randomly n_perfect_train_matches: nr of perfect matches used for training scorer: a Scorer object used for scoring pairs + active_learning_method: active learning method for labeling data, two available options: 'uncertainty' + and 'diverse_batch'. the default option is 'uncertainty' verbose: sets verbosity max_edges_clustering: max number of edges per component that enters clustering edge_filter_thresholds: list of score thresholds to use for filtering when components are too large @@ -52,14 +52,15 @@ def __init__(self, spark_session: SparkSession, col_names: Optional[List[str]] = field_info: Optional[Dict] = None, blocking_rules: Optional[List[BlockingRule]] = None, blocking_recall: float = 1.0, table_checkpointer: Optional[TableCheckpointer] = None, checkpoint_dir: Optional[str] = None, n_perfect_train_matches=1, n_train_samples: int = 100_000, - ratio_hashed_samples: float = 0.5, scorer: Optional[Scorer] = None, verbose: int = 0, + ratio_hashed_samples: float = 0.5, scorer: Optional[Scorer] = None, + active_learning_method = 'uncertainty', batch_size = 5, verbose: int = 0, max_edges_clustering: int = 500_000, edge_filter_thresholds: List[float] = [0.45, 0.55, 0.65, 0.75, 0.85, 0.95], cluster_score_threshold: float = 0.5): super().__init__(spark_session, table_checkpointer, checkpoint_dir, col_names, field_info, blocking_rules, - blocking_recall, n_perfect_train_matches, n_train_samples, ratio_hashed_samples, scorer, - verbose) + blocking_recall, n_perfect_train_matches, n_train_samples, ratio_hashed_samples, scorer, + active_learning_method, batch_size, verbose) self.fitted_ = False self.max_edges_clustering = max_edges_clustering diff --git a/spark_matcher/matcher/matcher.py b/spark_matcher/matcher/matcher.py index 1c6df17..22b335c 100644 --- a/spark_matcher/matcher/matcher.py +++ b/spark_matcher/matcher/matcher.py @@ -3,12 +3,10 @@ # Frits Hermans from typing import Optional, List, Dict - from pyspark.sql import DataFrame, functions as F from pyspark.sql import SparkSession from pyspark.sql import Window from sklearn.exceptions import NotFittedError - from spark_matcher.blocker.blocking_rules import BlockingRule from spark_matcher.matching_base.matching_base import MatchingBase from spark_matcher.scorer.scorer import Scorer @@ -40,16 +38,19 @@ class Matcher(MatchingBase): ratio_hashed_samples: ratio of hashed samples to be created for training, rest is sampled randomly n_perfect_train_matches: nr of perfect matches used for training scorer: a Scorer object used for scoring pairs + active_learning_method: active learning method for labeling data, two available options: 'uncertainty' + and 'diverse_batch'. The default method is 'uncertainty'. verbose: sets verbosity """ def __init__(self, spark_session: SparkSession, table_checkpointer: Optional[TableCheckpointer]=None, checkpoint_dir: Optional[str]=None, col_names: Optional[List[str]] = None, field_info: Optional[Dict] = None, blocking_rules: Optional[List[BlockingRule]] = None, blocking_recall: float = 1.0, n_perfect_train_matches=1, n_train_samples: int = 100_000, - ratio_hashed_samples: float = 0.5, scorer: Optional[Scorer] = None, verbose: int = 0): + ratio_hashed_samples: float = 0.5, scorer: Optional[Scorer] = None, + active_learning_method = 'uncertainty', batch_size = 5, verbose: int = 0): super().__init__(spark_session, table_checkpointer, checkpoint_dir, col_names, field_info, blocking_rules, - blocking_recall, n_perfect_train_matches, n_train_samples, ratio_hashed_samples, scorer, - verbose) + blocking_recall, n_perfect_train_matches, n_train_samples, ratio_hashed_samples, scorer, + active_learning_method, batch_size, verbose) self.fitted_ = False def _create_predict_pairs_table(self, sdf_1_blocked: DataFrame, sdf_2_blocked: DataFrame) -> DataFrame: diff --git a/spark_matcher/matching_base/matching_base.py b/spark_matcher/matching_base/matching_base.py index 0d1e518..784ac16 100644 --- a/spark_matcher/matching_base/matching_base.py +++ b/spark_matcher/matching_base/matching_base.py @@ -4,14 +4,12 @@ import warnings from typing import Optional, List, Dict - import dill from pyspark.sql import DataFrame, functions as F, SparkSession import numpy as np import pandas as pd from thefuzz.fuzz import token_set_ratio, token_sort_ratio - -from spark_matcher.activelearner.active_learner import ScoringLearner +from spark_matcher.activelearner import ConfidenceLearner, DiverseMiniBatchLearner from spark_matcher.blocker.block_learner import BlockLearner from spark_matcher.sampler.training_sampler import HashSampler, RandomSampler from spark_matcher.scorer.scorer import Scorer @@ -21,12 +19,13 @@ class MatchingBase: - + def __init__(self, spark_session: SparkSession, table_checkpointer: Optional[TableCheckpointer] = None, checkpoint_dir: Optional[str] = None, col_names: Optional[List[str]] = None, field_info: Optional[Dict] = None, blocking_rules: Optional[List[BlockingRule]] = None, blocking_recall: float = 1.0, n_perfect_train_matches=1, n_train_samples: int = 100_000, - ratio_hashed_samples: float = 0.5, scorer: Optional[Scorer] = None, verbose: int = 0): + ratio_hashed_samples: float = 0.5, scorer: Optional[Scorer] = None, + active_learning_method: str= 'uncertainty', batch_size: int = 5, verbose: int = 0): self.spark_session = spark_session self.table_checkpointer = table_checkpointer if not self.table_checkpointer and checkpoint_dir: @@ -56,7 +55,16 @@ def __init__(self, spark_session: SparkSession, table_checkpointer: Optional[Tab if not scorer: scorer = Scorer(self.spark_session) - self.scoring_learner = ScoringLearner(self.col_names, scorer, verbose=self.verbose) + + if active_learning_method == 'uncertainty': + self.scoring_learner = ConfidenceLearner(self.col_names, scorer, verbose=self.verbose) + elif active_learning_method == 'diverse_batch': + self.scoring_learner = DiverseMiniBatchLearner(self.col_names, scorer + , batch_size=batch_size, verbose=self.verbose) + else: + warnings.warn('active learning method is not properly defined, so the defualt value is uncertainty method!') + self.scoring_learner = ConfidenceLearner(self.col_names, scorer, verbose=self.verbose) + self.blocking_rules = blocking_rules if not self.blocking_rules: @@ -93,7 +101,7 @@ def load(self, path: str) -> None: """ with open(path, 'rb') as f: loaded_obj = dill.load(f) - + # the spark session that was removed before saving needs to be filled with the spark session of this instance loaded_obj['spark_session'] = self.spark_session setattr(loaded_obj['scoring_learner'].learner.estimator, 'spark_session', self.spark_session) @@ -186,11 +194,11 @@ def _create_blocklearning_input(self, metrics_table: pd.DataFrame, threshold: in # get labelled positives from activelearner positive_train_labels = (self.scoring_learner.train_samples[self.scoring_learner.train_samples['y'] == '1'] .rename(columns={'y': 'label'})) + metrics_table = pd.concat([metrics_table, positive_train_labels]).drop_duplicates( subset=[col + "_1" for col in self.col_names] + [col + "_2" for col in self.col_names], keep='last') metrics_table = metrics_table[metrics_table.label == '1'] - metrics_table['row_id'] = np.arange(len(metrics_table)) return self.spark_session.createDataFrame(metrics_table) diff --git a/test/test_active_learner/test_active_learner.py b/test/test_active_learner/test_active_learner.py index 43f3de5..0714a30 100644 --- a/test/test_active_learner/test_active_learner.py +++ b/test/test_active_learner/test_active_learner.py @@ -1,30 +1,58 @@ -from spark_matcher.activelearner.active_learner import ScoringLearner +from spark_matcher.activelearner import ConfidenceLearner, DiverseMiniBatchLearner from spark_matcher.scorer.scorer import Scorer def test__get_uncertainty_improvement(spark_session): scorer = Scorer(spark_session) - myScoringLearner = ScoringLearner(col_names=[''], scorer=scorer, n_uncertainty_improvement=5) + myScoringLearner = ConfidenceLearner(col_names=[''], scorer=scorer, n_uncertainty_improvement=5) myScoringLearner.uncertainties = [0.4, 0.2, 0.1, 0.08, 0.05, 0.03] - assert myScoringLearner._get_uncertainty_improvement() == 0.2 + assert myScoringLearner.get_uncertainty_improvement() == 0.2 def test__is_converged(spark_session): scorer = Scorer(spark_session) - myScoringLearner = ScoringLearner(col_names=[''], scorer=scorer, min_nr_samples=5, uncertainty_threshold=0.1, + myScoringLearner = ConfidenceLearner(col_names=[''], scorer=scorer, min_nr_samples=5, uncertainty_threshold=0.1, uncertainty_improvement_threshold=0.01, n_uncertainty_improvement=5) # insufficient labelled samples myScoringLearner.uncertainties = [0.4, 0.39, 0.395] myScoringLearner.counter_total = len(myScoringLearner.uncertainties) - assert not myScoringLearner._is_converged() + assert not myScoringLearner.is_converged() # too large improvement in last 5 iterations myScoringLearner.uncertainties = [0.4, 0.2, 0.19, 0.18, 0.17, 0.16] myScoringLearner.counter_total = len(myScoringLearner.uncertainties) - assert not myScoringLearner._is_converged() + assert not myScoringLearner.is_converged() # improvement in last 5 iterations below threshold and sufficient labelled cases myScoringLearner.uncertainties = [0.19, 0.1, 0.08, 0.05, 0.03, 0.02] myScoringLearner.counter_total = len(myScoringLearner.uncertainties) - assert myScoringLearner._is_converged() + assert myScoringLearner.is_converged() + + +def test__get_uncertainty_improvement_batch(spark_session): + scorer = Scorer(spark_session) + myScoringLearner = DiverseMiniBatchLearner(col_names=[''], scorer=scorer, n_uncertainty_improvement=5) + myScoringLearner.uncertainties = [0.4, 0.2, 0.1, 0.08, 0.05, 0.03] + assert myScoringLearner.get_uncertainty_improvement() == 0.2 + + +def test__is_converged_batch(spark_session): + scorer = Scorer(spark_session) + myScoringLearner = DiverseMiniBatchLearner(col_names=[''], scorer=scorer, batch_size=5, min_nr_batch=1, uncertainty_threshold=0.1, + uncertainty_improvement_threshold=0.01, n_uncertainty_improvement=5) + + # insufficient labelled samples + myScoringLearner.uncertainties = [0.4, 0.39, 0.395] + myScoringLearner.counter_total = len(myScoringLearner.uncertainties) + assert not myScoringLearner.is_converged() + + # too large improvement in last 5 iterations + myScoringLearner.uncertainties = [0.4, 0.2, 0.19, 0.18, 0.17, 0.16] + myScoringLearner.counter_total = len(myScoringLearner.uncertainties) + assert not myScoringLearner.is_converged() + + # improvement in last 5 iterations below threshold and sufficient labelled cases + myScoringLearner.uncertainties = [0.19, 0.1, 0.08, 0.05, 0.03, 0.02] + myScoringLearner.counter_total = len(myScoringLearner.uncertainties) + assert myScoringLearner.is_converged()