Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add type annotation for benchmark parser module #636

Merged
merged 11 commits into from
Sep 8, 2024
2 changes: 1 addition & 1 deletion .github/workflows/run_all_frameworks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ jobs:
if: needs.detect_changes.outputs.skip_baseline == 0
strategy:
matrix:
framework: [constantpredictor, randomforest]
framework: [constantpredictor, randomforest, autogluon]
task: [iris, kc2, cholesterol]
fail-fast: false
steps:
Expand Down
6 changes: 3 additions & 3 deletions amlb/benchmarks/openml.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ def is_openml_benchmark(benchmark: str) -> bool:
def load_oml_benchmark(benchmark: str) -> tuple[str, str | None, list[Namespace]]:
""" Loads benchmark defined by openml suite or task, from openml/s/X or openml/t/Y. """
domain, oml_type, oml_id = benchmark.split('/')
path = None # benchmark file does not exist on disk
name = benchmark # name is later passed as cli input again for containers, it needs to remain parsable

if domain == "test.openml":
log.debug("Setting openml server to the test server.")
Expand Down Expand Up @@ -62,4 +60,6 @@ def load_oml_benchmark(benchmark: str) -> tuple[str, str | None, list[Namespace]
id="{}.org/t/{}".format(domain, tid)))
else:
raise ValueError(f"The oml_type is {oml_type} but must be 's' or 't'")
return name, path, tasks
# The first argument needs to remain parsable further in the pipeline as is
# The second argument is path, the benchmark does not exist on disk
return benchmark, None, tasks
17 changes: 13 additions & 4 deletions amlb/benchmarks/parser.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
from typing import List
from __future__ import annotations

from typing import List, Tuple

from .openml import is_openml_benchmark, load_oml_benchmark
from .file import load_file_benchmark
from amlb.utils import str_sanitize
from amlb.utils import str_sanitize, Namespace


def benchmark_load(name, benchmark_definition_dirs: List[str]):
def benchmark_load(
name: str,
benchmark_definition_dirs: List[str]
) -> Tuple[
Namespace | None,
List[Namespace],
str | None,
str
]:
""" Loads the benchmark definition for the 'benchmark' cli input string.

:param name: the value for 'benchmark'
Expand All @@ -17,7 +27,6 @@ def benchmark_load(name, benchmark_definition_dirs: List[str]):
# which is why it is tried last.
if is_openml_benchmark(name):
benchmark_name, benchmark_path, tasks = load_oml_benchmark(name)
# elif is_kaggle_benchmark(name):
else:
benchmark_name, benchmark_path, tasks = load_file_benchmark(name, benchmark_definition_dirs)

Expand Down
164 changes: 104 additions & 60 deletions amlb/datautils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,20 @@
Also, this module is intended to be imported by frameworks integration modules,
therefore, it should have no dependency to any other **amlb** module outside **utils**.
"""
from __future__ import annotations

import logging
import os
from typing import Iterable, Type, Literal, Any, Callable, Tuple, cast, Union
try:
from typing_extensions import TypeAlias
except ImportError:
pass # Only available when dev dependencies are installed, only needed for type check

import arff
import numpy as np
import pandas as pd
import scipy.sparse
from sklearn.base import TransformerMixin
from sklearn.impute import SimpleImputer as Imputer
from sklearn.metrics import accuracy_score, auc, average_precision_score, balanced_accuracy_score, confusion_matrix, fbeta_score, \
Expand All @@ -25,8 +33,11 @@

log = logging.getLogger(__name__)

A: TypeAlias = Union[np.ndarray, scipy.sparse.csr_matrix]
DF = pd.DataFrame
S = pd.Series

def read_csv(path, nrows=None, header=True, index=False, as_data_frame=True, dtype=None, timestamp_column=None):
def read_csv(path, nrows=None, header=True, index=False, as_data_frame=True, dtype=None, timestamp_column=None): # type: ignore # Split up to two functions, avoid "aliasing"
"""
read csv file to DataFrame.

Expand Down Expand Up @@ -55,21 +66,31 @@ def read_csv(path, nrows=None, header=True, index=False, as_data_frame=True, dty
return df if as_data_frame else df.values


def write_csv(data, path, header=True, columns=None, index=False, append=False):
if is_data_frame(data):
def write_csv( # type: ignore[no-untyped-def]
data: pd.DataFrame | dict | list | np.ndarray,
path,
header: bool = True,
columns: Iterable[str] | None = None,
index: bool = False,
append: bool = False
) -> None:
if isinstance(data, pd.DataFrame):
data_frame = data
else:
data_frame = to_data_frame(data, columns=columns)
data_frame = to_data_frame(data, column_names=columns)
header = header and columns is not None
touch(path)
data_frame.to_csv(path,
header=header,
index=index,
mode='a' if append else 'w')
data_frame.to_csv(
path,
header=header,
index=index,
mode=cast(Literal['a','w'], 'a' if append else 'w')
)


@profile(logger=log)
def reorder_dataset(path, target_src=0, target_dest=-1, save=True):
def reorder_dataset(path: str, target_src: int=0, target_dest: int=-1, save:bool=True) -> str | np.ndarray:
""" Put the `target_src`th column as the `target_dest`th column"""
if target_src == target_dest and save: # no reordering needed, not data to load, returning original path
return path

Expand Down Expand Up @@ -121,20 +142,21 @@ def reorder_dataset(path, target_src=0, target_dest=-1, save=True):
return reordered_path


def is_data_frame(df):
def is_data_frame(df: object) -> bool:
return isinstance(df, pd.DataFrame)


def to_data_frame(obj, columns=None):
def to_data_frame(obj: object, column_names: Iterable[str]| None=None) -> pd.DataFrame:
if obj is None:
return pd.DataFrame()
elif isinstance(obj, dict):
return pd.DataFrame.from_dict(obj, columns=columns, orient='columns' if columns is None else 'index')
elif isinstance(obj, (list, np.ndarray)):
columns = None if column_names is None else list(column_names)
if isinstance(obj, dict):
orient = cast(Literal['columns', 'index'], 'columns' if columns is None else 'index')
return pd.DataFrame.from_dict(obj, columns=columns, orient=orient) # type: ignore[arg-type]
if isinstance(obj, (list, np.ndarray)):
return pd.DataFrame.from_records(obj, columns=columns)
else:
raise ValueError("Object should be a dictionary {col1:values, col2:values, ...} "
"or an array of dictionary-like objects [{col1:val, col2:val}, {col1:val, col2:val}, ...].")
raise ValueError("Object should be a dictionary {col1:values, col2:values, ...} "
"or an array of dictionary-like objects [{col1:val, col2:val}, {col1:val, col2:val}, ...].")


class Encoder(TransformerMixin):
Expand All @@ -143,9 +165,15 @@ class Encoder(TransformerMixin):
Should never have written this, but does the job currently. However, should think about simpler single-purpose approach.
"""

def __init__(self, type='label', target=True, encoded_type=int,
missing_policy='ignore', missing_values=None, missing_replaced_by='',
normalize_fn=None):
def __init__(
self,
type: Literal['label', 'one-hot','no-op'] ='label',
target:bool=True,
encoded_type:Type=int,
missing_policy:Literal['ignore', 'mask', 'encode']='ignore',
missing_values: Any | Iterable[Any]| None=None,
missing_replaced_by: Any='',
normalize_fn: Callable[[np.ndarray],np.ndarray] | None = None):
"""
:param type: one of ['label', 'one-hot', 'no-op'].
:param target: True iff the Encoder is applied to the target feature.
Expand All @@ -168,7 +196,7 @@ def __init__(self, type='label', target=True, encoded_type=int,
self.missing_replaced_by = missing_replaced_by
self.missing_encoded_value = None
self.normalize_fn = normalize_fn
self.classes = None
self.classes: np.ndarray | None = None
self.encoded_type = encoded_type
if type == 'label':
self.delegate = LabelEncoder() if target else OrdinalEncoder()
Expand All @@ -179,44 +207,47 @@ def __init__(self, type='label', target=True, encoded_type=int,
else:
raise ValueError("Encoder `type` should be one of {}.".format(['label', 'one-hot']))

def __repr__(self):
def __repr__(self) -> str:
return repr_def(self)

@property
def _ignore_missing(self):
def _ignore_missing(self) -> bool:
return self.for_target or self.missing_policy == 'ignore'

@property
def _mask_missing(self):
def _mask_missing(self) -> bool:
return not self.for_target and self.missing_policy == 'mask'

@property
def _encode_missing(self):
def _encode_missing(self) -> bool:
return not self.for_target and self.missing_policy == 'encode'

def _reshape(self, vec):
def _reshape(self, vec: np.ndarray) -> np.ndarray:
return vec if self.for_target else vec.reshape(-1, 1)

def fit(self, vec):
def fit(self, vector: Iterable[str] | None) -> 'Encoder':
"""
:param vec: must be a line vector (array)
:param vector: must be a line vector (array)
:return:
"""
if not self.delegate:
return self

vec = np.asarray(vec, dtype=object)
if self.normalize_fn:
if vector is None:
raise ValueError("`vec` can only be `None` if Encoder was initialized with type 'label' or 'one-hot'.")

vec = np.asarray(vector, dtype=object)
if self.normalize_fn is not None:
vec = self.normalize_fn(vec)
self.classes = np.unique(vec) if self._ignore_missing else np.unique(np.insert(vec, 0, self.missing_replaced_by))

if self._mask_missing:
self.missing_encoded_value = self.delegate.fit_transform(self._reshape(self.classes))[0]
self.missing_encoded_value = self.delegate.fit_transform(self._reshape(cast(np.ndarray, self.classes)))[0]
else:
self.delegate.fit(self._reshape(self.classes))
self.delegate.fit(self._reshape(cast(np.ndarray, self.classes)))
return self

def transform(self, vec, **params):
def transform(self, vec: str | np.ndarray, **params: Any) -> str | np.ndarray:
"""
:param vec: must be single value (str) or a line vector (array)
:param params:
Expand All @@ -225,35 +256,35 @@ def transform(self, vec, **params):
if log.isEnabledFor(logging.TRACE):
log.debug("Transforming %s using %s", vec, self)

return_value = lambda v: v
if isinstance(vec, str):
vec = [vec]
return_value = lambda v: v[0]
vector = [vec] if isinstance(vec, str) else vec

vec = np.asarray(vec, dtype=object)
def return_value(v: np.ndarray) -> np.ndarray | str:
return v[0] if isinstance(vec, str) else v

vector = np.asarray(vector, dtype=object)

if not self.delegate:
return return_value(vec.astype(self.encoded_type, copy=False))
return return_value(vector.astype(self.encoded_type, copy=False))

if self._mask_missing or self._encode_missing:
mask = [v in self.missing_values for v in vec]
mask = [v in self.missing_values for v in vector]
if any(mask):
# if self._mask_missing:
# missing = vec[mask]
vec[mask] = self.missing_replaced_by
# missing = vector[mask]
vector[mask] = self.missing_replaced_by
if self.normalize_fn:
vec = self.normalize_fn(vec)
vector = self.normalize_fn(vector)

res = self.delegate.transform(self._reshape(vec), **params).astype(self.encoded_type, copy=False)
res = self.delegate.transform(self._reshape(vector), **params).astype(self.encoded_type, copy=False)
if self._mask_missing:
res[mask] = np.NaN if self.encoded_type == float else None
return return_value(res)

if self.normalize_fn:
vec = self.normalize_fn(vec)
return return_value(self.delegate.transform(self._reshape(vec), **params).astype(self.encoded_type, copy=False))
vector = self.normalize_fn(vector)
return return_value(self.delegate.transform(self._reshape(vector), **params).astype(self.encoded_type, copy=False))

def inverse_transform(self, vec, **params):
def inverse_transform(self, vec: str | np.ndarray, **params: Any) -> str | np.ndarray:
"""
:param vec: must a single value or line vector (array)
:param params:
Expand All @@ -267,36 +298,43 @@ def inverse_transform(self, vec, **params):
return self.delegate.inverse_transform(vec, **params)


def impute_array(X_fit, *X_s, missing_values=np.NaN, strategy="mean", keep_empty_features: bool = False):
def impute_array(
X_fit: A,
*X_s: Iterable[A],
missing_values: Any =np.NaN,
strategy: Literal['mean', 'mode', 'median', 'most_frequent'] | Tuple[Literal['constant'], Any]="mean",
keep_empty_features: bool = False) -> list[A] | A:
"""
:param X_fit: {array-like, sparse matrix} used to fit the imputer. This array is also imputed.
:param X_s: the additional (optional) arrays that are imputed using the same imputer.
:param missing_values: the value that will be substituted during the imputation.
:param strategy: 'mean' (default) -> missing values are imputed with the mean value of the corresponding vector.
'median' -> missing values are imputed with the median value of the corresponding vector.
'mode' -> missing values are imputed with the mode of the corresponding vector.
'most_frequent' -> alias for 'mode'
('constant', value) -> missing values are imputed with the constant value provided as the second term of the tuple.
None -> no-op (for internal use).
:param keep_empty_features: bool (default False), if False remove all columns which only have nan values.
:return: a list of imputed arrays, returned in the same order as they were provided.
"""
if strategy is None:
return [X_fit, *X_s]
strategy, fill_value = strategy if isinstance(strategy, tuple) and strategy[0] == 'constant' else (strategy, None)
strategy = dict(mode='most_frequent').get(strategy, strategy)
strategy_name, fill_value = strategy if isinstance(strategy, tuple) and strategy[0] == 'constant' else (strategy, None)
strategy_name = dict(mode='most_frequent').get(strategy_name, strategy_name) # type: ignore

imputer = Imputer(missing_values=missing_values, strategy=strategy, fill_value=fill_value, keep_empty_features=keep_empty_features)
imputer = Imputer(missing_values=missing_values, strategy=strategy_name, fill_value=fill_value, keep_empty_features=keep_empty_features)
imputed = _restore_dtypes(imputer.fit_transform(X_fit), X_fit)
if len(X_s) > 0:
result = [imputed]
for X in X_s:
result.append(_restore_dtypes(imputer.transform(X), X))
result.append(_restore_dtypes(imputer.transform(X), X)) # type: ignore
return result
else:
return imputed
return imputed


def impute_dataframe(X_fit: pd.DataFrame, *X_s: pd.DataFrame, missing_values=np.NaN, strategy='mean'):
def impute_dataframe(X_fit: pd.DataFrame, *X_s: Iterable[pd.DataFrame], missing_values: Any=np.NaN,
strategy: Literal['mean','median','mode'] | Tuple[Literal['constant'], Any] ='mean'
) -> pd.DataFrame | list[pd.DataFrame]:
"""
:param X_fit: used to fit the imputer. This dataframe is also imputed.
:param X_s: the additional (optional) dataframe that are imputed using the same imputer.
Expand All @@ -320,29 +358,35 @@ def impute_dataframe(X_fit: pd.DataFrame, *X_s: pd.DataFrame, missing_values=np.
return imputed if X_s else imputed[0]


def _impute_pd(X_fit, *X_s, missing_values=np.NaN, strategy=None, is_int=False):
def _impute_pd(
X_fit: pd.DataFrame,
*X_s: Iterable[pd.DataFrame],
missing_values: Any = np.NaN,
strategy: Literal['mean','median','mode'] | Tuple[Literal['constant'], Any] | None =None,
is_int: bool = False
) -> list[pd.DataFrame]:
if strategy == 'mean':
fill = X_fit.mean()
elif strategy == 'median':
fill = X_fit.median()
elif strategy == 'mode':
fill = X_fit.mode().iloc[0, :]
fill = X_fit.mode().iloc[0, :] # type: ignore[call-overload]
elif isinstance(strategy, tuple) and strategy[0] == 'constant':
fill = strategy[1]
else:
return [X_fit, *X_s]
return [X_fit, *X_s] # type: ignore[list-item] # doesn't seem to understand unpacking

if is_int and isinstance(fill, pd.Series):
fill = fill.round()
return [df.replace(missing_values, fill) for df in [X_fit, *X_s]]


def _rows_with_nas(X):
def _rows_with_nas(X: np.ndarray | pd.DataFrame) -> pd.DataFrame:
df = X if isinstance(X, pd.DataFrame) else pd.DataFrame(X)
return df[df.isna().any(axis=1)]


def _restore_dtypes(X_np, X_ori):
def _restore_dtypes(X_np: np.ndarray, X_ori: pd.DataFrame | pd.Series | np.ndarray) -> pd.DataFrame | pd.Series | np.ndarray:
if isinstance(X_ori, pd.DataFrame):
df = pd.DataFrame(X_np, columns=X_ori.columns, index=X_ori.index).convert_dtypes()
df.astype(X_ori.dtypes.to_dict(), copy=False, errors='raise')
Expand Down
Loading
Loading