"""
@author: David Diaz Vico
@license: MIT
"""
from __future__ import annotations
import itertools
import os
import sys
from contextlib import contextmanager
from dataclasses import dataclass
from inspect import signature
from tempfile import NamedTemporaryFile, mkdtemp
from time import perf_counter, process_time, sleep
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Literal,
Mapping,
Protocol,
Sequence,
Tuple,
TypeVar,
Union,
)
from warnings import warn
import joblib
import numpy as np
from sacred import Experiment, Ingredient
from sacred.observers import FileStorageObserver, MongoObserver, RunObserver
from sklearn.base import BaseEstimator, is_classifier
from sklearn.model_selection import check_cv
from sklearn.utils import Bunch, is_scalar_nan
from incense import ExperimentLoader, FileSystemExperimentLoader
from incense.experiment import FileSystemExperiment
SelfType = TypeVar("SelfType")
class DataLike(Protocol):
def __getitem__(
self: SelfType,
key: np.typing.NDArray[int],
) -> SelfType:
pass
def __len__(self) -> int:
pass
DataType = TypeVar("DataType", bound=DataLike, contravariant=True)
TargetType = TypeVar("TargetType", bound=DataLike)
IndicesType = Tuple[np.typing.NDArray[int], np.typing.NDArray[int]]
ExplicitSplitType = Tuple[
np.typing.NDArray[float],
np.typing.NDArray[Union[float, int]],
np.typing.NDArray[float],
np.typing.NDArray[Union[float, int]],
]
ConfigLike = Union[
Mapping[str, Any],
str,
]
class EstimatorProtocol(Protocol[DataType, TargetType]):
def fit(self: SelfType, X: DataType, y: TargetType) -> SelfType:
pass
def predict(self, X: DataType) -> TargetType:
pass
class CVSplitter(Protocol):
def split(
self,
X: np.typing.NDArray[float],
y: None = None,
groups: None = None,
) -> Iterable[IndicesType]:
pass
def get_n_splits(
self,
X: np.typing.NDArray[float],
y: None = None,
groups: None = None,
) -> int:
pass
CVLike = Union[
CVSplitter,
Iterable[IndicesType],
int,
None,
]
EstimatorLike = Union[
EstimatorProtocol[Any, Any],
Callable[..., EstimatorProtocol[Any, Any]],
Tuple[Callable[..., EstimatorProtocol[Any, Any]], ConfigLike],
]
DatasetLike = Union[
Bunch,
Callable[..., Bunch],
Tuple[Callable[..., Bunch], ConfigLike],
]
[docs]@dataclass
class ScoresInfo:
r"""
Class containing the scores of several related experiments.
Attributes
----------
dataset_names : Sequence of :external:class:`str`
Name of the datasets, with the same order in which are present
in the rows of the scores.
estimator_names : Sequence of :external:class:`str`
Name of the estimators, with the same order in which are present
in the columns of the scores.
scores : :external:class:`numpy.ndarray`
Test scores. It has size ``n_datasets`` :math:`\times` ``n_estimators``
:math:`\times` ``n_partitions``.
scores_mean : :external:class:`numpy.ndarray`
Test score means. It has size ``n_datasets``
:math:`\times` ``n_estimators``.
scores_std : :external:class:`numpy.ndarray`
Test score standard deviations. It has size ``n_datasets``
:math:`\times` ``n_estimators``.
See Also
--------
fetch_scores
"""
dataset_names: Sequence[str]
estimator_names: Sequence[str]
scores: np.typing.NDArray[float]
scores_mean: np.typing.NDArray[float]
scores_std: np.typing.NDArray[float]
def _append_info(experiment: Experiment, name: str, value: Any) -> None:
info_list = experiment.info.get(name, [])
info_list.append(value)
experiment.info[name] = info_list
@contextmanager
def _add_timing(experiment: Experiment, name: str) -> Iterator[None]:
initial_time = perf_counter()
try:
yield None
finally:
final_time = perf_counter()
elapsed_time = final_time - initial_time
_append_info(experiment, name, elapsed_time)
def _iterate_outer_cv(
outer_cv: CVLike | Iterable[
Tuple[DataType, TargetType, DataType, TargetType]
],
estimator: EstimatorProtocol[DataType, TargetType],
X: DataType,
y: TargetType,
) -> Iterable[Tuple[DataType, TargetType, DataType, TargetType]]:
"""Iterate over multiple partitions."""
if isinstance(outer_cv, Iterable):
outer_cv, cv_copy = itertools.tee(outer_cv)
if len(next(cv_copy)) == 4:
yield from outer_cv
cv = check_cv(outer_cv, y, classifier=is_classifier(estimator))
yield from (
(X[train], y[train], X[test], y[test])
for train, test in cv.split(X, y)
)
def _benchmark_from_data(
experiment: Experiment,
*,
estimator: BaseEstimator,
X_train: DataType,
y_train: TargetType,
X_test: DataType,
y_test: TargetType,
save_estimator: bool = False,
save_train: bool = False,
) -> None:
with _add_timing(experiment, "fit_time"):
estimator.fit(X_train, y_train)
if save_estimator:
_append_info(experiment, "fitted_estimator", estimator)
best_params = getattr(estimator, "best_params_", None)
if best_params:
_append_info(experiment, "search_best_params", best_params)
best_score = getattr(estimator, "best_score_", None)
if best_params:
_append_info(experiment, "search_best_score", best_score)
with _add_timing(experiment, "score_time"):
test_score = estimator.score(X_test, y_test)
_append_info(experiment, "test_score", float(test_score))
if save_train:
train_score = estimator.score(X_train, y_train)
_append_info(experiment, "train_score", float(train_score))
for output in ("transform", "predict"):
method = getattr(estimator, output, None)
if method is not None:
with _add_timing(experiment, f"{output}_time"):
_append_info(experiment, f"{output}", method(X_test))
def _compute_means(experiment: Experiment) -> None:
experiment.info["score_mean"] = float(
np.nanmean(experiment.info["test_score"])
)
experiment.info["score_std"] = float(
np.nanstd(experiment.info["test_score"])
)
def _benchmark_one(
experiment: Experiment,
*,
estimator: BaseEstimator,
data: Bunch,
save_estimator: bool = False,
save_train: bool = False,
) -> None:
"""Use only one predefined partition."""
X = data.data
y = data.target
train_indices = getattr(data, "train_indices", [])
validation_indices = getattr(data, "validation_indices", [])
test_indices = getattr(data, "test_indices", [])
X_train_val = (
X[train_indices + validation_indices]
if train_indices
else X
)
y_train_val = (
y[train_indices + validation_indices]
if train_indices
else y
)
X_test = X[test_indices]
y_test = y[test_indices]
_benchmark_from_data(
experiment=experiment,
estimator=estimator,
X_train=X_train_val,
y_train=y_train_val,
X_test=X_test,
y_test=y_test,
save_estimator=save_estimator,
save_train=save_train,
)
_compute_means(experiment)
def _benchmark_partitions(
experiment: Experiment,
*,
estimator: BaseEstimator,
data: Bunch,
save_estimator: bool = False,
save_train: bool = False,
outer_cv: CVLike | Literal["dataset"] = None,
) -> None:
"""Use several partitions."""
outer_cv = data.outer_cv if outer_cv == "dataset" else outer_cv
for X_train, y_train, X_test, y_test in _iterate_outer_cv(
outer_cv=outer_cv,
estimator=estimator,
X=data.data,
y=data.target,
):
_benchmark_from_data(
experiment=experiment,
estimator=estimator,
X_train=X_train,
y_train=y_train,
X_test=X_test,
y_test=y_test,
save_estimator=save_estimator,
save_train=save_train,
)
_compute_means(experiment)
def _benchmark(
experiment: Experiment,
*,
estimator: BaseEstimator,
data: Bunch,
save_estimator: bool = False,
save_train: bool = False,
outer_cv: CVLike | Literal[False, "dataset"] = None,
) -> None:
"""Run the experiment."""
if outer_cv is False:
_benchmark_one(
experiment=experiment,
estimator=estimator,
data=data,
save_estimator=save_estimator,
save_train=save_train,
)
else:
_benchmark_partitions(
experiment=experiment,
estimator=estimator,
data=data,
save_estimator=save_estimator,
save_train=save_train,
outer_cv=outer_cv,
)
def experiment(
dataset: Callable[..., Bunch],
estimator: Callable[..., BaseEstimator],
*,
save_estimator: bool = False,
save_train: bool = False,
) -> Experiment:
"""
Prepare a Scikit-learn experiment as a Sacred experiment.
Prepare a Scikit-learn experiment indicating a dataset and an estimator and
return it as a Sacred experiment.
Parameters
----------
dataset : function
Dataset fetch function. Might receive any argument. Must return a
:external:class:`sklearn.utils.Bunch` with ``data``, ``target``
(might be ``None``), ``inner_cv`` (might be ``None``) and ``outer_cv``
(might be ``None``).
estimator : function
Estimator initialization function. Might receive any keyword argument.
Must return an initialized sklearn-compatible estimator.
Returns
-------
experiment : Experiment
Sacred experiment, ready to be run.
"""
dataset_ingredient = Ingredient("dataset")
dataset = dataset_ingredient.capture(dataset)
estimator_ingredient = Ingredient("estimator")
estimator = estimator_ingredient.capture(estimator)
experiment = Experiment(
ingredients=(
dataset_ingredient,
estimator_ingredient,
),
)
@experiment.main
def run() -> None:
"""Run the experiment."""
data = dataset()
# Metaparameter search
cv = getattr(data, "inner_cv", None)
try:
e = estimator(cv=cv)
except TypeError as exception:
warn(f"The estimator does not accept cv: {exception}")
e = estimator()
# Model assessment
_benchmark(
experiment=experiment,
estimator=e,
data=data,
save_estimator=save_estimator,
save_train=save_train,
)
# Ensure that everything is in the info dict at the end
# See https://github.com/IDSIA/sacred/issues/830
sleep(experiment.current_run.beat_interval + 1)
return experiment
def _get_estimator_function(
experiment: Experiment,
estimator: EstimatorLike,
) -> Callable[..., EstimatorProtocol[Any, Any]]:
if hasattr(estimator, "fit"):
def estimator_function() -> EstimatorProtocol:
return estimator
else:
estimator_function = estimator
return experiment.capture(estimator_function)
def _get_dataset_function(
experiment: Experiment,
dataset: DatasetLike,
) -> Callable[..., Bunch]:
if callable(dataset):
dataset_function = dataset
else:
def dataset_function() -> Bunch:
return dataset
return experiment.capture(dataset_function)
def _create_one_experiment(
*,
estimator_name: str,
estimator: EstimatorLike,
dataset_name: str,
dataset: DatasetLike,
storage: RunObserver,
config: ConfigLike,
inner_cv: CVLike | Literal[False, "dataset"] = None,
outer_cv: CVLike | Literal[False, "dataset"] = None,
save_estimator: bool = False,
save_train: bool = False,
) -> Experiment:
experiment = Experiment()
experiment.add_config(config)
experiment.add_config({"estimator_name": estimator_name})
if isinstance(estimator, tuple):
estimator, estimator_config = estimator
experiment.add_config(estimator_config)
experiment.add_config({"dataset_name": dataset_name})
if isinstance(dataset, tuple):
dataset, dataset_config = dataset
experiment.add_config(dataset_config)
experiment.observers.append(storage)
estimator_function = _get_estimator_function(experiment, estimator)
dataset_function = _get_dataset_function(experiment, dataset)
@experiment.main
def run() -> None:
"""Run the experiment."""
dataset = dataset_function()
# Metaparameter search
cv = dataset.inner_cv if inner_cv == "dataset" else inner_cv
estimator = estimator_function()
if hasattr(estimator, "cv") and cv is not False:
estimator.cv = cv
# Model assessment
_benchmark(
experiment=experiment,
estimator=estimator,
data=dataset,
save_estimator=save_estimator,
save_train=save_train,
outer_cv=outer_cv,
)
return experiment
[docs]def create_experiments(
*,
datasets: Mapping[str, DatasetLike],
estimators: Mapping[str, EstimatorLike],
storage: RunObserver | str,
config: ConfigLike | None = None,
inner_cv: CVLike | Literal[False, "dataset"] = False,
outer_cv: CVLike | Literal[False, "dataset"] = None,
save_estimator: bool = False,
save_train: bool = False,
) -> Sequence[Experiment]:
"""
Create several Sacred experiments.
It receives a set of estimators and datasets, and create Sacred experiment
objects for them.
Parameters
----------
datasets : Mapping
Mapping where each key is the name for a dataset and each value
is either:
* A :external:class:`sklearn.utils.Bunch` with the fields explained
in :doc:`/structure`. Only ``data`` and ``target`` are
mandatory.
* A function receiving arbitrary config values and returning a
:external:class:`sklearn.utils.Bunch` object like the one explained
above.
* A tuple with such a function and additional configuration (either
a mapping or a filename).
estimators : Mapping
Mapping where each key is the name for a estimator and each value
is either:
* A scikit-learn compatible estimator.
* A function receiving arbitrary config values and returning a
scikit-learn compatible estimator.
* A tuple with such a function and additional configuration (either
a mapping or a filename).
storage : :external:class:`sacred.observers.RunObserver` or :class:`str`
Where the experiments will be stored. Either a Sacred observer, for
example to store in a Mongo database, or the name of a directory, to
use a file observer.
config : Mapping, :class:`str` or ``None``, default ``None``
A mapping or filename with additional configuration for the experiment.
inner_cv : CV-like object, ``"datasets"`` or ``False``, default ``False``
For estimators that perform cross validation (they have a ``cv``
parameter) this sets the cross validation strategy, as follows:
* If ``False`` the original value of ``cv`` is unchanged.
* If ``"dataset"``, the :external:class:`sklearn.utils.Bunch` objects
for the datasets must have a ``inner_cv`` attribute, which will
be the one used.
* Otherwise, ``cv`` is changed to this value.
outer_cv : CV-like object, ``"datasets"`` or ``False``, default ``None``
The strategy used to evaluate different partitions of the data, as
follows:
* If ``False`` use only one partition: the one specified in the
dataset. Thus the :external:class:`sklearn.utils.Bunch` objects
for the datasets should have defined at least a train and a test
partition.
* If ``"dataset"``, the :external:class:`sklearn.utils.Bunch` objects
for the datasets must have a ``outer_cv`` attribute, which will
be the one used.
* Otherwise, this will be passed to
:external:func:`sklearn.model_selection.check_cv` and the resulting
cross validator will be used to define the partitions.
save_estimator : bool, default ``False``
Whether to save the fitted estimator. This is useful for debugging
and for obtaining extra information in some cases, but for some
estimators it could consume much storage.
save_train : bool, default ``False``
If ``True``, compute and store also the score over the train data.
Returns
-------
experiments : Sequence of :external:class:`sacred.Experiment`
Sequence of Sacred experiments, ready to be run.
See Also
--------
run_experiments
fetch_scores
"""
if isinstance(storage, str):
storage = FileStorageObserver(storage)
if config is None:
config = {}
return [
_create_one_experiment(
estimator_name=estimator_name,
estimator=estimator,
dataset_name=dataset_name,
dataset=dataset,
storage=storage,
config=config,
inner_cv=inner_cv,
outer_cv=outer_cv,
save_estimator=save_estimator,
save_train=save_train,
)
for estimator_name, estimator in estimators.items()
for dataset_name, dataset in datasets.items()
]
[docs]def run_experiments(
experiments: Sequence[Experiment],
) -> Sequence[int]:
"""
Run Sacred experiments.
Parameters
----------
experiments : Sequence of :external:class:`sacred.Experiment`
Sequence of Sacred experiments to be run.
Returns
-------
ids : Sequence of :external:class:`int`
Sequence of identifiers for each experiment.
See Also
--------
create_experiments
fetch_scores
"""
return [e.run()._id for e in experiments]
def _loader_from_observer(
storage: RunObserver | str,
) -> ExperimentLoader | FileSystemExperimentLoader:
if isinstance(storage, str):
return FileSystemExperimentLoader(storage)
elif isinstance(storage, FileStorageObserver):
return FileSystemExperimentLoader(storage.basedir)
elif isinstance(storage, MongoObserver):
database = storage.runs.database
client = database.client
url, port = list(
client.topology_description.server_descriptions().keys(),
)[0]
return ExperimentLoader(
mongo_uri=f"mongodb://{url}:{port}/",
db_name=database.name,
unpickle=False,
)
raise ValueError(f"Observer {storage} is not supported.")
def _get_experiments(
*,
storage: RunObserver | str,
ids: Sequence[int] | None = None,
dataset_names: Sequence[str] | None = None,
estimator_names: Sequence[str] | None = None,
) -> Sequence[Experiment]:
loader = _loader_from_observer(storage)
if (
(ids, dataset_names, estimator_names) == (None, None, None)
or isinstance(loader, FileSystemExperimentLoader) and ids is None
):
find_all_fun = getattr(
loader,
"find_all",
lambda: [
FileSystemExperiment.from_run_dir(run_dir)
for run_dir in loader._runs_dir.iterdir()
],
)
experiments = find_all_fun()
elif (
(dataset_names, estimator_names) == (None, None)
or isinstance(loader, FileSystemExperimentLoader)
):
load_ids_fun = getattr(
loader,
"find_by_ids",
lambda id_seq: [
loader.find_by_id(experiment_id)
for experiment_id in id_seq
],
)
experiments = load_ids_fun(ids)
else:
conditions: List[
Mapping[
str,
Mapping[str, Sequence[Any]],
]
] = []
if ids is not None:
conditions.append({"_id": {"$in": ids}})
if estimator_names is not None:
conditions.append(
{"config.estimator_name": {"$in": estimator_names}})
if dataset_names is not None:
conditions.append({"config.dataset_name": {"$in": dataset_names}})
query = {"$and": conditions}
experiments = loader.find(query)
if isinstance(loader, FileSystemExperimentLoader):
# Filter experiments by dataset and estimator names
experiments = [
e for e in experiments
if (
(
estimator_names is None
or e.config["estimator_name"] in estimator_names
)
and (
dataset_names is None
or e.config["dataset_name"] in dataset_names
)
)
]
return experiments
[docs]def fetch_scores(
*,
storage: RunObserver | str,
ids: Sequence[int] | None = None,
dataset_names: Sequence[str] | None = None,
estimator_names: Sequence[str] | None = None,
) -> ScoresInfo:
"""
Fetch scores from Sacred experiments.
By default, it retrieves every experiment. The parameters ``ids``,
``estimator_names`` and ``dataset_names`` can be used to restrict the
number of experiments returned.
Parameters
----------
storage : :external:class:`sacred.observers.RunObserver` or :class:`str`
Where the experiments are stored. Either a Sacred observer, for
example for a Mongo database, or the name of a directory, to
use a file observer.
ids : Sequence of :external:class:`int` or ``None``, default ``None``
If not ``None``, return only experiments whose id is contained
in the sequence.
dataset_names : Sequence of :class:`str` or ``None``, default ``None``
If not ``None``, return only experiments whose dataset names are
contained in the sequence.
The order of the names is also the one used for datasets when
combining the results.
estimator_names : Sequence of :class:`str` or ``None``, default ``None``
If not ``None``, return only experiments whose estimator names are
contained in the sequence.
The order of the names is also the one used for estimators when
combining the results.
Returns
-------
info : :class:`ScoresInfo`
Class containing information about experiments scores.
See Also
--------
run_experiments
fetch_scores
"""
experiments = _get_experiments(
storage=storage,
ids=ids,
dataset_names=dataset_names,
estimator_names=estimator_names,
)
dict_experiments: Dict[
str,
Dict[str, Tuple[np.typing.NDArray[float], float, float]],
] = {}
estimator_list = []
dataset_list = []
nobs = 0
for experiment in experiments:
estimator_name = experiment.config["estimator_name"]
if estimator_name not in estimator_list:
estimator_list.append(estimator_name)
dataset_name = experiment.config["dataset_name"]
if dataset_name not in dataset_list:
dataset_list.append(dataset_name)
scores = experiment.info.get("test_score", np.array([]))
score_mean = experiment.info.get("score_mean", np.nan)
score_std = experiment.info.get("score_std", np.nan)
nobs = max(nobs, len(scores))
assert np.isnan(score_mean) or score_mean == np.mean(scores)
assert np.isnan(score_std) or score_std == np.std(scores)
if estimator_name not in dict_experiments:
dict_experiments[estimator_name] = {}
if dataset_name in dict_experiments[estimator_name]:
raise ValueError(
f"Repeated experiment: ({estimator_name}, {dataset_name})",
)
dict_experiments[estimator_name][dataset_name] = (
scores,
score_mean,
score_std,
)
estimator_names = (
tuple(estimator_list) if estimator_names is None else estimator_names
)
dataset_names = (
tuple(dataset_list) if dataset_names is None else dataset_names
)
matrix_shape = (len(dataset_names), len(estimator_names))
scores = np.full(matrix_shape + (nobs,), np.nan)
scores_mean = np.full(matrix_shape, np.nan)
scores_std = np.full(matrix_shape, np.nan)
for i, dataset_name in enumerate(dataset_names):
for j, estimator_name in enumerate(estimator_names):
dict_estimator = dict_experiments.get(estimator_name, {})
s, mean, std = dict_estimator.get(
dataset_name,
(np.array([]), np.nan, np.nan),
)
if len(s) == nobs:
scores[i, j] = s
scores_mean[i, j] = mean
scores_std[i, j] = std
scores = np.array(scores.tolist())
return ScoresInfo(
dataset_names=dataset_names,
estimator_names=estimator_names,
scores=scores,
scores_mean=scores_mean,
scores_std=scores_std,
)