From 1cbc8e1d5db48505ad23643fe4f7b21e6be95946 Mon Sep 17 00:00:00 2001
From: Pieter Gijsbers
Date: Sun, 8 Sep 2024 19:19:47 +0200
Subject: [PATCH] Add type annotation for benchmark parser module (#636)
It's progress in the right direction. I would prefer to refactor some methods but they are not properly covered by tests. So will be revisited.
---
.github/workflows/run_all_frameworks.yml | 2 +-
amlb/benchmarks/openml.py | 6 +-
amlb/benchmarks/parser.py | 17 ++-
amlb/datautils.py | 164 ++++++++++++++---------
amlb/results.py | 2 +-
amlb/utils/core.py | 2 +-
6 files changed, 123 insertions(+), 70 deletions(-)
diff --git a/.github/workflows/run_all_frameworks.yml b/.github/workflows/run_all_frameworks.yml
index d08027284..4f25e493e 100644
--- a/.github/workflows/run_all_frameworks.yml
+++ b/.github/workflows/run_all_frameworks.yml
@@ -86,7 +86,7 @@ jobs:
if: needs.detect_changes.outputs.skip_baseline == 0
strategy:
matrix:
- framework: [constantpredictor, randomforest]
+ framework: [constantpredictor, randomforest, autogluon]
task: [iris, kc2, cholesterol]
fail-fast: false
steps:
diff --git a/amlb/benchmarks/openml.py b/amlb/benchmarks/openml.py
index 93c4cf42a..fa9befef1 100644
--- a/amlb/benchmarks/openml.py
+++ b/amlb/benchmarks/openml.py
@@ -26,8 +26,6 @@ def is_openml_benchmark(benchmark: str) -> bool:
def load_oml_benchmark(benchmark: str) -> tuple[str, str | None, list[Namespace]]:
""" Loads benchmark defined by openml suite or task, from openml/s/X or openml/t/Y. """
domain, oml_type, oml_id = benchmark.split('/')
- path = None # benchmark file does not exist on disk
- name = benchmark # name is later passed as cli input again for containers, it needs to remain parsable
if domain == "test.openml":
log.debug("Setting openml server to the test server.")
@@ -62,4 +60,6 @@ def load_oml_benchmark(benchmark: str) -> tuple[str, str | None, list[Namespace]
id="{}.org/t/{}".format(domain, tid)))
else:
raise ValueError(f"The oml_type is {oml_type} but must be 's' or 't'")
- return name, path, tasks
+ # The first argument needs to remain parsable further in the pipeline as is
+ # The second argument is path, the benchmark does not exist on disk
+ return benchmark, None, tasks
diff --git a/amlb/benchmarks/parser.py b/amlb/benchmarks/parser.py
index c4aa3e246..f78434b58 100644
--- a/amlb/benchmarks/parser.py
+++ b/amlb/benchmarks/parser.py
@@ -1,11 +1,21 @@
-from typing import List
+from __future__ import annotations
+
+from typing import List, Tuple
from .openml import is_openml_benchmark, load_oml_benchmark
from .file import load_file_benchmark
-from amlb.utils import str_sanitize
+from amlb.utils import str_sanitize, Namespace
-def benchmark_load(name, benchmark_definition_dirs: List[str]):
+def benchmark_load(
+ name: str,
+ benchmark_definition_dirs: List[str]
+ ) -> Tuple[
+ Namespace | None,
+ List[Namespace],
+ str | None,
+ str
+ ]:
""" Loads the benchmark definition for the 'benchmark' cli input string.
:param name: the value for 'benchmark'
@@ -17,7 +27,6 @@ def benchmark_load(name, benchmark_definition_dirs: List[str]):
# which is why it is tried last.
if is_openml_benchmark(name):
benchmark_name, benchmark_path, tasks = load_oml_benchmark(name)
- # elif is_kaggle_benchmark(name):
else:
benchmark_name, benchmark_path, tasks = load_file_benchmark(name, benchmark_definition_dirs)
diff --git a/amlb/datautils.py b/amlb/datautils.py
index d8a24d2ef..67524d21a 100644
--- a/amlb/datautils.py
+++ b/amlb/datautils.py
@@ -7,12 +7,20 @@
Also, this module is intended to be imported by frameworks integration modules,
therefore, it should have no dependency to any other **amlb** module outside **utils**.
"""
+from __future__ import annotations
+
import logging
import os
+from typing import Iterable, Type, Literal, Any, Callable, Tuple, cast, Union
+try:
+ from typing_extensions import TypeAlias
+except ImportError:
+ pass # Only available when dev dependencies are installed, only needed for type check
import arff
import numpy as np
import pandas as pd
+import scipy.sparse
from sklearn.base import TransformerMixin
from sklearn.impute import SimpleImputer as Imputer
from sklearn.metrics import accuracy_score, auc, average_precision_score, balanced_accuracy_score, confusion_matrix, fbeta_score, \
@@ -25,8 +33,11 @@
log = logging.getLogger(__name__)
+A: TypeAlias = Union[np.ndarray, scipy.sparse.csr_matrix]
+DF = pd.DataFrame
+S = pd.Series
-def read_csv(path, nrows=None, header=True, index=False, as_data_frame=True, dtype=None, timestamp_column=None):
+def read_csv(path, nrows=None, header=True, index=False, as_data_frame=True, dtype=None, timestamp_column=None): # type: ignore # Split up to two functions, avoid "aliasing"
"""
read csv file to DataFrame.
@@ -55,21 +66,31 @@ def read_csv(path, nrows=None, header=True, index=False, as_data_frame=True, dty
return df if as_data_frame else df.values
-def write_csv(data, path, header=True, columns=None, index=False, append=False):
- if is_data_frame(data):
+def write_csv( # type: ignore[no-untyped-def]
+ data: pd.DataFrame | dict | list | np.ndarray,
+ path,
+ header: bool = True,
+ columns: Iterable[str] | None = None,
+ index: bool = False,
+ append: bool = False
+) -> None:
+ if isinstance(data, pd.DataFrame):
data_frame = data
else:
- data_frame = to_data_frame(data, columns=columns)
+ data_frame = to_data_frame(data, column_names=columns)
header = header and columns is not None
touch(path)
- data_frame.to_csv(path,
- header=header,
- index=index,
- mode='a' if append else 'w')
+ data_frame.to_csv(
+ path,
+ header=header,
+ index=index,
+ mode=cast(Literal['a','w'], 'a' if append else 'w')
+ )
@profile(logger=log)
-def reorder_dataset(path, target_src=0, target_dest=-1, save=True):
+def reorder_dataset(path: str, target_src: int=0, target_dest: int=-1, save:bool=True) -> str | np.ndarray:
+ """ Put the `target_src`th column as the `target_dest`th column"""
if target_src == target_dest and save: # no reordering needed, not data to load, returning original path
return path
@@ -121,20 +142,21 @@ def reorder_dataset(path, target_src=0, target_dest=-1, save=True):
return reordered_path
-def is_data_frame(df):
+def is_data_frame(df: object) -> bool:
return isinstance(df, pd.DataFrame)
-def to_data_frame(obj, columns=None):
+def to_data_frame(obj: object, column_names: Iterable[str]| None=None) -> pd.DataFrame:
if obj is None:
return pd.DataFrame()
- elif isinstance(obj, dict):
- return pd.DataFrame.from_dict(obj, columns=columns, orient='columns' if columns is None else 'index')
- elif isinstance(obj, (list, np.ndarray)):
+ columns = None if column_names is None else list(column_names)
+ if isinstance(obj, dict):
+ orient = cast(Literal['columns', 'index'], 'columns' if columns is None else 'index')
+ return pd.DataFrame.from_dict(obj, columns=columns, orient=orient) # type: ignore[arg-type]
+ if isinstance(obj, (list, np.ndarray)):
return pd.DataFrame.from_records(obj, columns=columns)
- else:
- raise ValueError("Object should be a dictionary {col1:values, col2:values, ...} "
- "or an array of dictionary-like objects [{col1:val, col2:val}, {col1:val, col2:val}, ...].")
+ raise ValueError("Object should be a dictionary {col1:values, col2:values, ...} "
+ "or an array of dictionary-like objects [{col1:val, col2:val}, {col1:val, col2:val}, ...].")
class Encoder(TransformerMixin):
@@ -143,9 +165,15 @@ class Encoder(TransformerMixin):
Should never have written this, but does the job currently. However, should think about simpler single-purpose approach.
"""
- def __init__(self, type='label', target=True, encoded_type=int,
- missing_policy='ignore', missing_values=None, missing_replaced_by='',
- normalize_fn=None):
+ def __init__(
+ self,
+ type: Literal['label', 'one-hot','no-op'] ='label',
+ target:bool=True,
+ encoded_type:Type=int,
+ missing_policy:Literal['ignore', 'mask', 'encode']='ignore',
+ missing_values: Any | Iterable[Any]| None=None,
+ missing_replaced_by: Any='',
+ normalize_fn: Callable[[np.ndarray],np.ndarray] | None = None):
"""
:param type: one of ['label', 'one-hot', 'no-op'].
:param target: True iff the Encoder is applied to the target feature.
@@ -168,7 +196,7 @@ def __init__(self, type='label', target=True, encoded_type=int,
self.missing_replaced_by = missing_replaced_by
self.missing_encoded_value = None
self.normalize_fn = normalize_fn
- self.classes = None
+ self.classes: np.ndarray | None = None
self.encoded_type = encoded_type
if type == 'label':
self.delegate = LabelEncoder() if target else OrdinalEncoder()
@@ -179,44 +207,47 @@ def __init__(self, type='label', target=True, encoded_type=int,
else:
raise ValueError("Encoder `type` should be one of {}.".format(['label', 'one-hot']))
- def __repr__(self):
+ def __repr__(self) -> str:
return repr_def(self)
@property
- def _ignore_missing(self):
+ def _ignore_missing(self) -> bool:
return self.for_target or self.missing_policy == 'ignore'
@property
- def _mask_missing(self):
+ def _mask_missing(self) -> bool:
return not self.for_target and self.missing_policy == 'mask'
@property
- def _encode_missing(self):
+ def _encode_missing(self) -> bool:
return not self.for_target and self.missing_policy == 'encode'
- def _reshape(self, vec):
+ def _reshape(self, vec: np.ndarray) -> np.ndarray:
return vec if self.for_target else vec.reshape(-1, 1)
- def fit(self, vec):
+ def fit(self, vector: Iterable[str] | None) -> 'Encoder':
"""
- :param vec: must be a line vector (array)
+ :param vector: must be a line vector (array)
:return:
"""
if not self.delegate:
return self
- vec = np.asarray(vec, dtype=object)
- if self.normalize_fn:
+ if vector is None:
+ raise ValueError("`vec` can only be `None` if Encoder was initialized with type 'label' or 'one-hot'.")
+
+ vec = np.asarray(vector, dtype=object)
+ if self.normalize_fn is not None:
vec = self.normalize_fn(vec)
self.classes = np.unique(vec) if self._ignore_missing else np.unique(np.insert(vec, 0, self.missing_replaced_by))
if self._mask_missing:
- self.missing_encoded_value = self.delegate.fit_transform(self._reshape(self.classes))[0]
+ self.missing_encoded_value = self.delegate.fit_transform(self._reshape(cast(np.ndarray, self.classes)))[0]
else:
- self.delegate.fit(self._reshape(self.classes))
+ self.delegate.fit(self._reshape(cast(np.ndarray, self.classes)))
return self
- def transform(self, vec, **params):
+ def transform(self, vec: str | np.ndarray, **params: Any) -> str | np.ndarray:
"""
:param vec: must be single value (str) or a line vector (array)
:param params:
@@ -225,35 +256,35 @@ def transform(self, vec, **params):
if log.isEnabledFor(logging.TRACE):
log.debug("Transforming %s using %s", vec, self)
- return_value = lambda v: v
- if isinstance(vec, str):
- vec = [vec]
- return_value = lambda v: v[0]
+ vector = [vec] if isinstance(vec, str) else vec
- vec = np.asarray(vec, dtype=object)
+ def return_value(v: np.ndarray) -> np.ndarray | str:
+ return v[0] if isinstance(vec, str) else v
+
+ vector = np.asarray(vector, dtype=object)
if not self.delegate:
- return return_value(vec.astype(self.encoded_type, copy=False))
+ return return_value(vector.astype(self.encoded_type, copy=False))
if self._mask_missing or self._encode_missing:
- mask = [v in self.missing_values for v in vec]
+ mask = [v in self.missing_values for v in vector]
if any(mask):
# if self._mask_missing:
- # missing = vec[mask]
- vec[mask] = self.missing_replaced_by
+ # missing = vector[mask]
+ vector[mask] = self.missing_replaced_by
if self.normalize_fn:
- vec = self.normalize_fn(vec)
+ vector = self.normalize_fn(vector)
- res = self.delegate.transform(self._reshape(vec), **params).astype(self.encoded_type, copy=False)
+ res = self.delegate.transform(self._reshape(vector), **params).astype(self.encoded_type, copy=False)
if self._mask_missing:
res[mask] = np.NaN if self.encoded_type == float else None
return return_value(res)
if self.normalize_fn:
- vec = self.normalize_fn(vec)
- return return_value(self.delegate.transform(self._reshape(vec), **params).astype(self.encoded_type, copy=False))
+ vector = self.normalize_fn(vector)
+ return return_value(self.delegate.transform(self._reshape(vector), **params).astype(self.encoded_type, copy=False))
- def inverse_transform(self, vec, **params):
+ def inverse_transform(self, vec: str | np.ndarray, **params: Any) -> str | np.ndarray:
"""
:param vec: must a single value or line vector (array)
:param params:
@@ -267,7 +298,12 @@ def inverse_transform(self, vec, **params):
return self.delegate.inverse_transform(vec, **params)
-def impute_array(X_fit, *X_s, missing_values=np.NaN, strategy="mean", keep_empty_features: bool = False):
+def impute_array(
+ X_fit: A,
+ *X_s: Iterable[A],
+ missing_values: Any =np.NaN,
+ strategy: Literal['mean', 'mode', 'median', 'most_frequent'] | Tuple[Literal['constant'], Any]="mean",
+ keep_empty_features: bool = False) -> list[A] | A:
"""
:param X_fit: {array-like, sparse matrix} used to fit the imputer. This array is also imputed.
:param X_s: the additional (optional) arrays that are imputed using the same imputer.
@@ -275,6 +311,7 @@ def impute_array(X_fit, *X_s, missing_values=np.NaN, strategy="mean", keep_empty
:param strategy: 'mean' (default) -> missing values are imputed with the mean value of the corresponding vector.
'median' -> missing values are imputed with the median value of the corresponding vector.
'mode' -> missing values are imputed with the mode of the corresponding vector.
+ 'most_frequent' -> alias for 'mode'
('constant', value) -> missing values are imputed with the constant value provided as the second term of the tuple.
None -> no-op (for internal use).
:param keep_empty_features: bool (default False), if False remove all columns which only have nan values.
@@ -282,21 +319,22 @@ def impute_array(X_fit, *X_s, missing_values=np.NaN, strategy="mean", keep_empty
"""
if strategy is None:
return [X_fit, *X_s]
- strategy, fill_value = strategy if isinstance(strategy, tuple) and strategy[0] == 'constant' else (strategy, None)
- strategy = dict(mode='most_frequent').get(strategy, strategy)
+ strategy_name, fill_value = strategy if isinstance(strategy, tuple) and strategy[0] == 'constant' else (strategy, None)
+ strategy_name = dict(mode='most_frequent').get(strategy_name, strategy_name) # type: ignore
- imputer = Imputer(missing_values=missing_values, strategy=strategy, fill_value=fill_value, keep_empty_features=keep_empty_features)
+ imputer = Imputer(missing_values=missing_values, strategy=strategy_name, fill_value=fill_value, keep_empty_features=keep_empty_features)
imputed = _restore_dtypes(imputer.fit_transform(X_fit), X_fit)
if len(X_s) > 0:
result = [imputed]
for X in X_s:
- result.append(_restore_dtypes(imputer.transform(X), X))
+ result.append(_restore_dtypes(imputer.transform(X), X)) # type: ignore
return result
- else:
- return imputed
+ return imputed
-def impute_dataframe(X_fit: pd.DataFrame, *X_s: pd.DataFrame, missing_values=np.NaN, strategy='mean'):
+def impute_dataframe(X_fit: pd.DataFrame, *X_s: Iterable[pd.DataFrame], missing_values: Any=np.NaN,
+ strategy: Literal['mean','median','mode'] | Tuple[Literal['constant'], Any] ='mean'
+ ) -> pd.DataFrame | list[pd.DataFrame]:
"""
:param X_fit: used to fit the imputer. This dataframe is also imputed.
:param X_s: the additional (optional) dataframe that are imputed using the same imputer.
@@ -320,29 +358,35 @@ def impute_dataframe(X_fit: pd.DataFrame, *X_s: pd.DataFrame, missing_values=np.
return imputed if X_s else imputed[0]
-def _impute_pd(X_fit, *X_s, missing_values=np.NaN, strategy=None, is_int=False):
+def _impute_pd(
+ X_fit: pd.DataFrame,
+ *X_s: Iterable[pd.DataFrame],
+ missing_values: Any = np.NaN,
+ strategy: Literal['mean','median','mode'] | Tuple[Literal['constant'], Any] | None =None,
+ is_int: bool = False
+) -> list[pd.DataFrame]:
if strategy == 'mean':
fill = X_fit.mean()
elif strategy == 'median':
fill = X_fit.median()
elif strategy == 'mode':
- fill = X_fit.mode().iloc[0, :]
+ fill = X_fit.mode().iloc[0, :] # type: ignore[call-overload]
elif isinstance(strategy, tuple) and strategy[0] == 'constant':
fill = strategy[1]
else:
- return [X_fit, *X_s]
+ return [X_fit, *X_s] # type: ignore[list-item] # doesn't seem to understand unpacking
if is_int and isinstance(fill, pd.Series):
fill = fill.round()
return [df.replace(missing_values, fill) for df in [X_fit, *X_s]]
-def _rows_with_nas(X):
+def _rows_with_nas(X: np.ndarray | pd.DataFrame) -> pd.DataFrame:
df = X if isinstance(X, pd.DataFrame) else pd.DataFrame(X)
return df[df.isna().any(axis=1)]
-def _restore_dtypes(X_np, X_ori):
+def _restore_dtypes(X_np: np.ndarray, X_ori: pd.DataFrame | pd.Series | np.ndarray) -> pd.DataFrame | pd.Series | np.ndarray:
if isinstance(X_ori, pd.DataFrame):
df = pd.DataFrame(X_np, columns=X_ori.columns, index=X_ori.index).convert_dtypes()
df.astype(X_ori.dtypes.to_dict(), copy=False, errors='raise')
diff --git a/amlb/results.py b/amlb/results.py
index 4cef9498e..b3991fac5 100644
--- a/amlb/results.py
+++ b/amlb/results.py
@@ -309,7 +309,7 @@ def save_predictions(dataset: Dataset, output_file: str,
if probabilities is not None:
prob_cols = probabilities_labels if probabilities_labels else dataset.target.label_encoder.classes
- df = to_data_frame(probabilities, columns=prob_cols)
+ df = to_data_frame(probabilities, column_names=prob_cols)
if probabilities_labels is not None:
df = df[sort(prob_cols)] # reorder columns alphabetically: necessary to match label encoding
if any(prob_cols != df.columns.values):
diff --git a/amlb/utils/core.py b/amlb/utils/core.py
index 130e910fc..a7b34892e 100644
--- a/amlb/utils/core.py
+++ b/amlb/utils/core.py
@@ -345,7 +345,7 @@ def str_iter(col, sep=", "):
return sep.join(map(str, col))
-def str_sanitize(s):
+def str_sanitize(s: str) ->str:
return re.sub(r"[^\w-]", "_", s)