Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diverse active learning #9

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
'python-Levenshtein',
'thefuzz',
'modAL',
'cardinal',
'pytest',
'multipledispatch',
'dill',
'dill==0.2',
'graphframes',
'scipy'
]
Expand Down
5 changes: 3 additions & 2 deletions spark_matcher/activelearner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
__all__ = ['ScoringLearner']
__all__ = ['ConfidenceLearner', 'DiverseMiniBatchLearner']

from .active_learner import ScoringLearner
from .active_learner import ConfidenceLearner
from .batch_active_learner import DiverseMiniBatchLearner
140 changes: 17 additions & 123 deletions spark_matcher/activelearner/active_learner.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
# Authors: Ahmet Bayraktar
# Stan Leisink
# Frits Hermans

from typing import List, Optional, Union

from typing import List, Union
import numpy as np
import pandas as pd
from modAL.models import ActiveLearner
from modAL.uncertainty import uncertainty_sampling
from pyspark.sql import DataFrame
from sklearn.base import BaseEstimator
from spark_matcher.activelearner.active_learner_base import ActiveLearnerBase


class ScoringLearner:
class ConfidenceLearner(ActiveLearnerBase):
"""
Class to train a string matching model using active learning.
Attributes:
Expand All @@ -32,113 +28,16 @@ def __init__(self, col_names: List[str], scorer: BaseEstimator, min_nr_samples:
uncertainty_threshold: float = 0.1, uncertainty_improvement_threshold: float = 0.01,
n_uncertainty_improvement: int = 5, n_queries: int = 9999, sampling_method=uncertainty_sampling,
verbose: int = 0):
self.col_names = col_names
super().__init__(col_names, min_nr_samples, uncertainty_threshold, uncertainty_improvement_threshold,
n_uncertainty_improvement, verbose)
self.learner = ActiveLearner(
estimator=scorer,
query_strategy=sampling_method
)
self.counter_total = 0
self.counter_positive = 0
self.counter_negative = 0
self.min_nr_samples = min_nr_samples
self.uncertainty_threshold = uncertainty_threshold
self.uncertainty_improvement_threshold = uncertainty_improvement_threshold
self.n_uncertainty_improvement = n_uncertainty_improvement
self.uncertainties = []
self.n_queries = n_queries
self.verbose = verbose

def _input_assert(self, message: str, choices: List[str]) -> str:
"""
Adds functionality to the python function `input` to limit the choices that can be returned
Args:
message: message to user
choices: list containing possible choices that can be returned
Returns:
input returned by user
"""
output = input(message).lower()
if output not in choices:
print(f"Wrong input! Your input should be one of the following: {', '.join(choices)}")
return self._input_assert(message, choices)
else:
return output

def _get_uncertainty_improvement(self) -> Optional[float]:
"""
Calculates the uncertainty differences during active learning. The largest difference over the `last_n`
iterations is returned. The aim of this function is to suggest early stopping of active learning.

Returns: largest uncertainty update in `last_n` iterations

"""
uncertainties = np.asarray(self.uncertainties)
abs_differences = abs(uncertainties[1:] - uncertainties[:-1])
return max(abs_differences[-self.n_uncertainty_improvement:])

def _is_converged(self) -> bool:
"""
Checks whether the model is converged by comparing the last uncertainty value with the `uncertainty_threshold`
and comparing the `last_n` uncertainty improvements with the `uncertainty_improvement_threshold`. These checks
are only performed if at least `min_nr_samples` are labelled.

Returns:
boolean indicating whether the model is converged

"""
if (self.counter_total >= self.min_nr_samples) and (
len(self.uncertainties) >= self.n_uncertainty_improvement + 1):
uncertainty_improvement = self._get_uncertainty_improvement()
if (self.uncertainties[-1] <= self.uncertainty_threshold) or (
uncertainty_improvement <= self.uncertainty_improvement_threshold):
return True
else:
return False

def _get_active_learning_input(self, query_inst: pd.DataFrame) -> np.ndarray:
"""
Obtain user input for a query during active learning.
Args:
query_inst: query as provided by the ActiveLearner instance
Returns: label of user input '1' or '0' as yes or no
'p' to go to previous
'f' to finish
's' to skip the query
"""
print(f'\nNr. {self.counter_total + 1} ({self.counter_positive}+/{self.counter_negative}-)')
print("Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish")
print('')
for element in [1, 2]:
for col_name in self.col_names:
print(f'{col_name}_{element}' + ': ' + query_inst[f'{col_name}_{element}'].iloc[0])
print('')
user_input = self._input_assert("", ['y', 'n', 'p', 'f', 's'])
# replace 'y' and 'n' with '1' and '0' to make them valid y labels
user_input = user_input.replace('y', '1').replace('n', '0')

y_new = np.array([user_input])
return y_new

def _calculate_uncertainty(self, x) -> None:
# take the maximum probability of the predicted classes as proxy of the confidence of the classifier
confidence = self.predict_proba(x).max(axis=1)[0]
if self.verbose:
print('uncertainty:', 1 - confidence)
self.uncertainties.append(1 - confidence)

def _show_min_max_scores(self, X: pd.DataFrame) -> None:
"""
Prints the lowest and the highest logistic regression scores on train data during active learning.

Args:
X: Pandas dataframe containing train data that is available for labelling duringg active learning
"""
X_all = pd.concat((X, self.train_samples))
pred_max = self.learner.predict_proba(np.array(X_all['similarity_metrics'].tolist())).max(axis=0)
print(f'lowest score: {1 - pred_max[0]:.3f}')
print(f'highest score: {pred_max[1]:.3f}')

def _label_perfect_train_matches(self, identical_records: pd.DataFrame) -> None:
def label_perfect_train_matches(self, identical_records: pd.DataFrame) -> None:
"""
To prevent asking labels for the perfect matches that were created by setting `n_perfect_train_matches`, these
are provided to the active learner upfront.
Expand All @@ -152,7 +51,7 @@ def _label_perfect_train_matches(self, identical_records: pd.DataFrame) -> None:
identical_records['y'].values)
self.train_samples = pd.concat([self.train_samples, identical_records])

def fit(self, X: pd.DataFrame) -> 'ScoringLearner':
def fit(self, X: pd.DataFrame) -> 'ConfidenceLearner':
"""
Fit ScoringLearner instance on pairs of strings
Args:
Expand All @@ -163,22 +62,20 @@ def fit(self, X: pd.DataFrame) -> 'ScoringLearner':

# automatically label all perfect train matches:
identical_records = X[X['perfect_train_match']].copy()
self._label_perfect_train_matches(identical_records)
X = X.drop(identical_records.index).reset_index(drop=True) # remove identical records to avoid double labelling

for i in range(self.n_queries):
self.label_perfect_train_matches(identical_records)
# remove identical records to avoid double labelling
X = X.drop(identical_records.index).reset_index(drop=True)
for _ in range(self.n_queries):
query_idx, query_inst = self.learner.query(np.array(X['similarity_metrics'].tolist()))

if self.learner.estimator.fitted_:
# the uncertainty calculations need a fitted estimator
# however it can occur that the estimator can only be fit after a couple rounds of querying
self._calculate_uncertainty(query_inst)
self.calculate_uncertainty(query_inst)
if self.verbose >= 2:
self._show_min_max_scores(X)

y_new = self._get_active_learning_input(X.iloc[query_idx])
self.show_min_max_scores(X)
y_new = self.get_active_learning_input(X.iloc[query_idx].iloc[0])
if y_new == 'p': # use previous (input is 'p')
y_new = self._get_active_learning_input(query_inst_prev)
y_new = self.get_active_learning_input(query_inst_prev.iloc[0])
elif y_new == 'f': # finish labelling (input is 'f')
break
query_inst_prev = X.iloc[query_idx]
Expand All @@ -187,12 +84,9 @@ def fit(self, X: pd.DataFrame) -> 'ScoringLearner':
train_sample_to_add = X.iloc[query_idx].copy()
train_sample_to_add['y'] = y_new
self.train_samples = pd.concat([self.train_samples, train_sample_to_add])

X = X.drop(query_idx).reset_index(drop=True)

if self._is_converged():
if self.is_converged():
print("Classifier converged, enter 'f' to stop training")

if y_new == '1':
self.counter_positive += 1
elif y_new == '0':
Expand All @@ -207,4 +101,4 @@ def predict_proba(self, X: Union[DataFrame, pd.DataFrame]) -> Union[DataFrame, p
X: Pandas or Spark dataframe to predict on
Returns: match probabilities
"""
return self.learner.estimator.predict_proba(X)
return self.learner.estimator.predict_proba(X)
167 changes: 167 additions & 0 deletions spark_matcher/activelearner/active_learner_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from typing import List, Optional
from abc import ABC, abstractmethod
import numpy as np
import pandas as pd


class ActiveLearnerBase(ABC):
"""
A base class for active learning approaches.
Attributes:
col_names: column names used for matching
scorer: the scorer to be used in the active learning loop
min_nr_samples: minimum number of responses required before classifier convergence is tested
uncertainty_threshold: threshold on the uncertainty of the classifier during active learning,
used for determining if the model has converged
uncertainty_improvement_threshold: threshold on the uncertainty improvement of classifier during active
learning, used for determining if the model has converged
n_uncertainty_improvement: span of iterations to check for largest difference between uncertainties
verbose: sets verbosity
"""
def __init__(self, col_names: List[str], min_nr_samples: int = 10,
uncertainty_threshold: float = 0.1, uncertainty_improvement_threshold: float = 0.01,
n_uncertainty_improvement: int = 5, verbose: int = 0):
self.col_names = col_names
self.counter_total = 0
self.counter_positive = 0
self.counter_negative = 0
self.min_nr_samples = min_nr_samples
self.uncertainty_threshold = uncertainty_threshold
self.uncertainty_improvement_threshold = uncertainty_improvement_threshold
self.n_uncertainty_improvement = n_uncertainty_improvement
self.uncertainties = []
self.train_samples = pd.DataFrame([])
self.verbose = verbose

def input_assert(self, message: str, choices: List[str]) -> str:
"""
Adds functionality to the python function `input` to limit the choices that can be returned
Args:
message: message to user
choices: list containing possible choices that can be returned
Returns:
input returned by user
"""
output = input(message).lower()
if output not in choices:
print(f"Wrong input! Your input should be one of the following: {', '.join(choices)}")
return self.input_assert(message, choices)
return output

def get_uncertainty_improvement(self) -> Optional[float]:
"""
Calculates the uncertainty differences during active learning. The largest difference over the `last_n`
iterations is returned. The aim of this function is to suggest early stopping of active learning.

Returns: largest uncertainty update in `last_n` iterations

"""
uncertainties = np.asarray(self.uncertainties)
abs_differences = abs(uncertainties[1:] - uncertainties[:-1])
return max(abs_differences[-self.n_uncertainty_improvement:])

def is_converged(self) -> bool:
"""
Checks whether the model is converged by comparing the last uncertainty value with the `uncertainty_threshold`
and comparing the `last_n` uncertainty improvements with the `uncertainty_improvement_threshold`. These checks
are only performed if at least `min_nr_samples` are labelled.

Returns:
boolean indicating whether the model is converged

"""
if (self.counter_total >= self.min_nr_samples) and (
len(self.uncertainties) >= self.n_uncertainty_improvement + 1):
uncertainty_improvement = self.get_uncertainty_improvement()
if (self.uncertainties[-1] <= self.uncertainty_threshold) or (
uncertainty_improvement <= self.uncertainty_improvement_threshold):
return True
else:
return False

def get_active_learning_input(self, x: pd.Series) -> np.ndarray:
"""
Obtain user input for a query during active learning.
Args:
x: query as provided by the ActiveLearner instance
Returns: label of user input '1' or '0' as yes or no
'p' to go to previous
'f' to finish
's' to skip the query
"""
print(f'\nNr. {self.counter_total + 1} ({self.counter_positive}+/{self.counter_negative}-)')
print("Is this a match? (y)es, (n)o, (p)revious, (s)kip, (f)inish")
print(' ')
for element in [1, 2]:
for col_name in self.col_names:
print(f'{col_name}_{element}' + ': ' + x[f'{col_name}_{element}'])
print('')
user_input = self.input_assert("", choices = ['y', 'n', 'p', 'f', 's'])
# replace 'y' and 'n' with '1' and '0' to make them valid y labels
user_input = user_input.replace('y', '1').replace('n', '0')
y_new = np.array([user_input])
return y_new

def _batch_uncertainty(self, x: np.ndarray) -> None:
"""
This function calculates average of uncertainty with lower/upper confidence level for a given batch of data
"""
classwise_certainty = self.predict_proba(x)
uncertainty = 1 - np.max(classwise_certainty, axis=1)
idx = np.arange(uncertainty.shape[0])
rng = np.random.RandomState(seed=1234)
samples_uncertainty = []
for _ in range(200):
pred_idx = rng.choice(idx, size=idx.shape[0], replace=True)
uncertainty_boot = np.mean(uncertainty[pred_idx])
samples_uncertainty.append(uncertainty_boot)
bootstrap_mean = np.mean(samples_uncertainty)
ci_lower = np.percentile(samples_uncertainty, 2.5)
ci_upper = np.percentile(samples_uncertainty, 97.5)
if self.verbose:
print(f"""The average uncertainty of model for given batch is {round(bootstrap_mean, ndigits=3)}
with lower and upper confidence of [{round(ci_lower, ndigits=3)}, {round(ci_upper, ndigits=3)}].""")
self.uncertainties.append(round(bootstrap_mean, ndigits=3))

def calculate_uncertainty(self, x: np.ndarray) -> None:
# take the maximum probability of the predicted classes as proxy of the confidence of the classifier
if x.shape[0] > 1 :
self._batch_uncertainty(x)
else:
confidence = self.predict_proba(x).max(axis=1)[0]
if self.verbose:
print('The uncertainty of selected sample is:', round(1 - confidence, ndigits=3))
self.uncertainties.append(round(1 - confidence, ndigits=3))

def show_min_max_scores(self, X: pd.DataFrame) -> None:
"""
Prints the lowest and the highest logistic regression scores on train data during active learning.

Args:
X: Pandas dataframe containing train data that is available for labelling duringg active learning
"""
X_all = pd.concat((X, self.train_samples))
pred_max = self.predict_proba(np.array(X_all['similarity_metrics'].tolist())).max(axis=0)
print(f"""The lowest and highest score of model for the entire dataset are :
[{1 - pred_max[0]:.3f}, {pred_max[1]:.3f}]""")

@abstractmethod
def label_perfect_train_matches(self, *args, **kwargs) -> None:
"""
To prevent asking labels for the perfect matches, this function provide them to the active learner upfront.
"""
pass

@abstractmethod
def fit(self, *args, **kwargs):
"""
fit the active learner instance on data
"""
pass

@abstractmethod
def predict_proba(self, *args, **kwargs):
"""
predict results using trained model
"""
pass
Loading