from __future__ import annotations
from functools import singledispatch
from typing import TYPE_CHECKING
import ehrdata as ed
import numpy as np
import sklearn.preprocessing as sklearn_pp
from ehrdata.core.constants import FEATURE_TYPE_KEY, NUMERIC_TAG
from ehrapy._compat import (
DaskArray,
_apply_over_time_axis,
_raise_array_type_not_implemented,
use_ehrdata,
)
if TYPE_CHECKING:
from collections.abc import Callable, Sequence
import pandas as pd
from anndata import AnnData
from ehrdata import EHRData
def _scale_func_group(
edata: EHRData | AnnData,
scale_func: Callable[[np.ndarray | pd.DataFrame], np.ndarray],
vars: str | Sequence[str] | None,
group_key: str | None,
layer: str | None,
copy: bool,
norm_name: str,
) -> EHRData | AnnData | None:
"""Apply scaling function to selected columns of edata, either globally or per group.
Supports both 2D and 3D data with unified layer handling.
"""
if group_key is not None and group_key not in edata.obs:
raise KeyError(f"group key '{group_key}' not found in edata.obs.")
if copy:
edata = edata.copy()
if FEATURE_TYPE_KEY not in edata.var.columns:
ed.infer_feature_types(edata, layer=layer, output=None)
if isinstance(vars, str):
vars = [vars]
if vars is None:
vars = edata.var_names[edata.var[FEATURE_TYPE_KEY] == NUMERIC_TAG].tolist()
else:
numeric_vars = edata.var_names[edata.var[FEATURE_TYPE_KEY] == NUMERIC_TAG].tolist()
if not set(vars) <= set(numeric_vars):
raise ValueError("Some selected vars are not numeric")
# Get numeric indices (positions) of the variables to normalize
var_indices = edata.var_names.get_indexer(vars)
X = edata.X if layer is None else edata.layers[layer]
if np.issubdtype(X.dtype, np.integer):
X = X.astype(np.float32)
if group_key is None:
X[:, var_indices] = scale_func(X[:, var_indices])
else:
# Group-wise normalization is not supported for Dask arrays
if isinstance(X, DaskArray):
raise NotImplementedError(
f"Group-wise normalization with group_key='{group_key}' does not support array type {type(X)}. "
"Please convert to numpy array first or use normalization without group_key."
)
for group in edata.obs[group_key].unique():
group_mask = np.where(edata.obs[group_key] == group)[0]
X[np.ix_(group_mask, var_indices)] = scale_func(X[np.ix_(group_mask, var_indices)])
if layer is None:
edata.X = X
else:
edata.layers[layer] = X
_record_norm(edata, vars, norm_name)
return edata if copy else None
@singledispatch
def _scale_norm_function(arr, **kwargs):
_raise_array_type_not_implemented(_scale_norm_function, type(arr))
@_scale_norm_function.register(np.ndarray)
@_apply_over_time_axis
def _(arr: np.ndarray, **kwargs):
return sklearn_pp.StandardScaler(**kwargs).fit_transform(arr)
@_scale_norm_function.register(DaskArray)
@_apply_over_time_axis
def _(arr: DaskArray, **kwargs):
import dask_ml.preprocessing as daskml_pp
return daskml_pp.StandardScaler(**kwargs).fit_transform(arr)
[docs]
@use_ehrdata(deprecated_after="1.0.0")
def scale_norm(
edata: EHRData | AnnData,
vars: str | Sequence[str] | None = None,
group_key: str | None = None,
layer: str | None = None,
copy: bool = False,
**kwargs,
) -> EHRData | AnnData | None:
"""Apply scaling normalization.
Functionality is provided by :class:`~sklearn.preprocessing.StandardScaler`, see https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.StandardScaler.html for details.
If `edata.X` is a Dask Array, functionality is provided by :class:`~dask_ml.preprocessing.StandardScaler`, see https://ml.dask.org/modules/generated/dask_ml.preprocessing.StandardScaler.html for details.
Supports both 2D and 3D data:
- 2D data: Standard normalization across observations
- 3D data: Per-variable normalization across samples and timestamps
Args:
edata: Central data object. Must already be encoded using :func:`~ehrapy.preprocessing.encode`.
vars: List of the names of the numeric variables to normalize.
If None all numeric variables will be normalized.
group_key: Key in edata.obs that contains group information. If provided, scaling is applied per group.
layer: The layer to normalize.
copy: Whether to return a copy or act in place.
**kwargs: Additional arguments passed to the StandardScaler.
Returns:
`None` if `copy=False` and modifies the passed edata, else returns an updated object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"].
Examples:
>>> import ehrdata as ed
>>> import ehrapy as ep
>>> import numpy as np
>>> edata = ed.dt.physionet2012(layer="tem_data")
>>> np.nanmean(edata.layers["tem_data"])
74.213570
>>> ep.pp.scale_norm(edata, layer="tem_data")
>>> np.nanmean(edata.layers["tem_data"])
0.0
"""
scale_func = lambda arr: _scale_norm_function(arr, **kwargs)
return _scale_func_group(
edata=edata,
scale_func=scale_func,
vars=vars,
group_key=group_key,
layer=layer,
copy=copy,
norm_name="scale",
)
@singledispatch
def _minmax_norm_function(arr, **kwargs):
_raise_array_type_not_implemented(_minmax_norm_function, type(arr))
@_minmax_norm_function.register(np.ndarray)
@_apply_over_time_axis
def _(arr: np.ndarray, **kwargs):
return sklearn_pp.MinMaxScaler(**kwargs).fit_transform(arr)
@_minmax_norm_function.register(DaskArray)
@_apply_over_time_axis
def _(arr: DaskArray, **kwargs):
import dask_ml.preprocessing as daskml_pp
return daskml_pp.MinMaxScaler(**kwargs).fit_transform(arr)
[docs]
@use_ehrdata(deprecated_after="1.0.0")
def minmax_norm(
edata: EHRData | AnnData,
vars: str | Sequence[str] | None = None,
group_key: str | None = None,
layer: str | None = None,
copy: bool = False,
**kwargs,
) -> EHRData | AnnData | None:
"""Apply min-max normalization.
Functionality is provided by :class:`~sklearn.preprocessing.MinMaxScaler`, see https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MinMaxScaler.html for details.
If `edata.X` is a Dask Array, functionality is provided by :class:`~dask_ml.preprocessing.MinMaxScaler`, see https://ml.dask.org/modules/generated/dask_ml.preprocessing.MinMaxScaler.html for details.
Supports both 2D and 3D data:
- 2D data: Standard normalization across observations
- 3D data: Per-variable normalization across samples and timestamps
Args:
edata: Central data object.
Must already be encoded using :func:`~ehrapy.preprocessing.encode`.
vars: List of the names of the numeric variables to normalize.
If None all numeric variables will be normalized.
group_key: Key in edata.obs that contains group information. If provided, scaling is applied per group.
layer: The layer to normalize.
copy: Whether to return a copy or act in place.
**kwargs: Additional arguments passed to the MinMaxScaler.
Returns:
`None` if `copy=False` and modifies the passed edata, else returns an updated object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"].
Examples:
>>> import ehrdata as ed
>>> import ehrapy as ep
>>> import numpy as np
>>> edata = ed.dt.physionet2012(layer="tem_data")
>>> np.nanmin(edata.layers["tem_data"]), np.nanmax(edata.layers["tem_data"])
(-17.8, 36400.0)
>>> ep.pp.minmax_norm(edata, layer="tem_data")
>>> np.nanmin(edata.layers["tem_data"]), np.nanmax(edata.layers["tem_data"])
(0.0, 1.0)
"""
scale_func = lambda arr: _minmax_norm_function(arr, **kwargs)
return _scale_func_group(
edata=edata,
scale_func=scale_func,
vars=vars,
group_key=group_key,
layer=layer,
copy=copy,
norm_name="minmax",
)
@singledispatch
def _maxabs_norm_function(arr):
_raise_array_type_not_implemented(_maxabs_norm_function, type(arr))
@_maxabs_norm_function.register(np.ndarray)
@_apply_over_time_axis
def _(arr: np.ndarray):
return sklearn_pp.MaxAbsScaler().fit_transform(arr)
[docs]
@use_ehrdata(deprecated_after="1.0.0")
def maxabs_norm(
edata: EHRData | AnnData,
vars: str | Sequence[str] | None = None,
group_key: str | None = None,
layer: str | None = None,
copy: bool = False,
) -> EHRData | AnnData | None:
"""Apply max-abs normalization.
Functionality is provided by :class:`~sklearn.preprocessing.MaxAbsScaler`, see https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MaxAbsScaler.html for details.
Note: Dask arrays are not supported for this function. Please convert to numpy array first.
Supports both 2D and 3D data:
- 2D data: Standard normalization across observations
- 3D data: Per-variable normalization across samples and timestamps
Args:
edata: Central data object.
Must already be encoded using :func:`~ehrapy.preprocessing.encode`.
vars: List of the names of the numeric variables to normalize.
If None all numeric variables will be normalized.
group_key: Key in edata.obs that contains group information. If provided, scaling is applied per group.
layer: The layer to normalize.
copy: Whether to return a copy or act in place.
Returns:
`None` if `copy=False` and modifies the passed edata, else returns an updated object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"].
Examples:
>>> import ehrdata as ed
>>> import ehrapy as ep
>>> import numpy as np
>>> edata = ed.dt.physionet2012(layer="tem_data")
>>> np.nanmax(np.abs(edata.layers["tem_data"]))
36400.0
>>> ep.pp.maxabs_norm(edata, layer="tem_data")
>>> np.nanmax(np.abs(edata.layers["tem_data"]))
1.0
"""
X = edata.X if layer is None else edata.layers[layer]
if isinstance(X, DaskArray):
_raise_array_type_not_implemented(_maxabs_norm_function, type(X))
scale_func = _maxabs_norm_function
return _scale_func_group(
edata=edata,
scale_func=scale_func,
vars=vars,
group_key=group_key,
layer=layer,
copy=copy,
norm_name="maxabs",
)
@singledispatch
def _robust_scale_norm_function(arr, **kwargs):
_raise_array_type_not_implemented(_robust_scale_norm_function, type(arr))
@_robust_scale_norm_function.register(np.ndarray)
@_apply_over_time_axis
def _(arr: np.ndarray, **kwargs):
return sklearn_pp.RobustScaler(**kwargs).fit_transform(arr)
@_robust_scale_norm_function.register(DaskArray)
@_apply_over_time_axis
def _(arr: DaskArray, **kwargs):
import dask_ml.preprocessing as daskml_pp
return daskml_pp.RobustScaler(**kwargs).fit_transform(arr)
[docs]
@use_ehrdata(deprecated_after="1.0.0")
def robust_scale_norm(
edata: EHRData | AnnData,
vars: str | Sequence[str] | None = None,
group_key: str | None = None,
layer: str | None = None,
copy: bool = False,
**kwargs,
) -> EHRData | AnnData | None:
"""Apply robust scaling normalization.
Functionality is provided by :class:`~sklearn.preprocessing.RobustScaler`,
see https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.RobustScaler.html for details.
If `edata.X` is a Dask Array, functionality is provided by :class:`~dask_ml.preprocessing.RobustScaler`, see https://ml.dask.org/modules/generated/dask_ml.preprocessing.RobustScaler.html for details.
Supports both 2D and 3D data:
- 2D data: Standard normalization across observations
- 3D data: Per-variable normalization across samples and timestamps
Args:
edata: Central data object.
Must already be encoded using :func:`~ehrapy.preprocessing.encode`.
vars: List of the names of the numeric variables to normalize.
If None all numeric variables will be normalized.
group_key: Key in edata.obs that contains group information. If provided, scaling is applied per group.
layer: The layer to normalize.
copy: Whether to return a copy or act in place.
**kwargs: Additional arguments passed to the RobustScaler.
Returns:
`None` if `copy=False` and modifies the passed edata, else returns an updated object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"].
Examples:
>>> import ehrdata as ed
>>> import ehrapy as ep
>>> import numpy as np
>>> edata = ed.dt.physionet2012(layer="tem_data")
>>> np.nanmedian(edata.layers["tem_data"])
69.0
>>> ep.pp.robust_scale_norm(edata, layer="tem_data")
>>> np.nanmedian(edata.layers["tem_data"])
0.0
"""
scale_func = lambda arr: _robust_scale_norm_function(arr, **kwargs)
return _scale_func_group(
edata=edata,
scale_func=scale_func,
vars=vars,
group_key=group_key,
layer=layer,
copy=copy,
norm_name="robust_scale",
)
@singledispatch
def _quantile_norm_function(arr, **kwargs):
_raise_array_type_not_implemented(_quantile_norm_function, type(arr))
@_quantile_norm_function.register(np.ndarray)
@_apply_over_time_axis
def _(arr: np.ndarray, **kwargs):
return sklearn_pp.QuantileTransformer(**kwargs).fit_transform(arr)
@_quantile_norm_function.register(DaskArray)
@_apply_over_time_axis
def _(arr: DaskArray, **kwargs):
import dask_ml.preprocessing as daskml_pp
return daskml_pp.QuantileTransformer(**kwargs).fit_transform(arr)
[docs]
@use_ehrdata(deprecated_after="1.0.0")
def quantile_norm(
edata: EHRData | AnnData,
vars: str | Sequence[str] | None = None,
group_key: str | None = None,
layer: str | None = None,
copy: bool = False,
**kwargs,
) -> EHRData | AnnData | None:
"""Apply quantile normalization.
Functionality is provided by :class:`~sklearn.preprocessing.QuantileTransformer`,
see https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.QuantileTransformer.html for details.
If `edata.X` is a Dask Array, functionality is provided by :class:`~dask_ml.preprocessing.QuantileTransformer`, see https://ml.dask.org/modules/generated/dask_ml.preprocessing.QuantileTransformer.html for details.
Supports both 2D and 3D data:
- 2D data: Standard normalization across observations
- 3D data: Per-variable normalization across samples and timestamps
Args:
edata: Central data object. Must already be encoded using :func:`~ehrapy.preprocessing.encode`.
vars: List of the names of the numeric variables to normalize.
If None all numeric variables will be normalized.
group_key: Key in edata.obs that contains group information. If provided, scaling is applied per group.
layer: The layer to normalize.
copy: Whether to return a copy or act in place.
**kwargs: Additional arguments passed to the QuantileTransformer.
Returns:
`None` if `copy=False` and modifies the passed edata, else returns an updated object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"].
Examples:
>>> import ehrdata as ed
>>> import ehrapy as ep
>>> import numpy as np
>>> edata = ed.dt.physionet2012(layer="tem_data")
>>> np.nanmin(edata.layers["tem_data"]), np.nanmax(edata.layers["tem_data"])
(-17.8, 36400.0)
>>> ep.pp.quantile_norm(edata, layer="tem_data")
>>> np.nanmin(edata.layers["tem_data"]), np.nanmax(edata.layers["tem_data"])
(0.0, 1.0)
"""
scale_func = lambda arr: _quantile_norm_function(arr, **kwargs)
return _scale_func_group(
edata=edata,
scale_func=scale_func,
vars=vars,
group_key=group_key,
layer=layer,
copy=copy,
norm_name="quantile",
)
@singledispatch
def _power_norm_function(arr, **kwargs):
_raise_array_type_not_implemented(_power_norm_function, type(arr))
@_power_norm_function.register(np.ndarray)
@_apply_over_time_axis
def _(arr: np.ndarray, **kwargs):
return sklearn_pp.PowerTransformer(**kwargs).fit_transform(arr)
[docs]
@use_ehrdata(deprecated_after="1.0.0")
def power_norm(
edata: EHRData | AnnData,
vars: str | Sequence[str] | None = None,
group_key: str | None = None,
layer: str | None = None,
copy: bool = False,
**kwargs,
) -> EHRData | AnnData | None:
"""Apply power transformation normalization.
Functionality is provided by :class:`~sklearn.preprocessing.PowerTransformer`,
see https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.PowerTransformer.html for details.
Note: Dask arrays are not supported for this function. Please convert to numpy array first.
Supports both 2D and 3D data:
- 2D data: Standard normalization across observations
- 3D data: Per-variable normalization across samples and timestamps
Args:
edata: Central data object.
Must already be encoded using :func:`~ehrapy.preprocessing.encode`.
vars: List of the names of the numeric variables to normalize.
If None all numeric variables will be normalized.
group_key: Key in edata.obs that contains group information. If provided, scaling is applied per group.
layer: The layer to normalize.
copy: Whether to return a copy or act in place.
**kwargs: Additional arguments passed to the PowerTransformer.
Returns:
`None` if `copy=False` and modifies the passed edata, else returns an updated object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"].
Examples:
>>> import ehrdata as ed
>>> import ehrapy as ep
>>> import numpy as np
>>> from scipy import stats
>>> edata = ed.dt.physionet2012(layer="tem_data")
>>> ep.pp.offset_negative_values(edata, layer="tem_data")
>>> skewed_data = np.power(edata.layers["tem_data"], 2)
>>> edata.layers["tem_data"] = skewed_data
>>> stats.skew(edata.layers["tem_data"].flatten())
504.250727
>>> ep.pp.power_norm(edata, layer="tem_data")
>>> stats.skew(edata.layers["tem_data"].flatten())
0.144324
"""
X = edata.X if layer is None else edata.layers[layer]
if isinstance(X, DaskArray):
_raise_array_type_not_implemented(_power_norm_function, type(X))
scale_func = lambda arr: _power_norm_function(arr, **kwargs)
return _scale_func_group(
edata=edata,
scale_func=scale_func,
vars=vars,
group_key=group_key,
layer=layer,
copy=copy,
norm_name="power",
)
@singledispatch
def _log_norm_function(arr, offset: int | float = 1, base: int | float | None = None):
_raise_array_type_not_implemented(_log_norm_function, type(arr))
@_log_norm_function.register(np.ndarray)
@_apply_over_time_axis
def _(arr: np.ndarray, offset: int | float = 1, base: int | float | None = None) -> np.ndarray:
if offset == 1:
np.log1p(arr, out=arr)
else:
np.add(arr, offset, out=arr)
np.log(arr, out=arr)
if base is not None:
np.divide(arr, np.log(base), out=arr)
return arr
@_log_norm_function.register(DaskArray)
@_apply_over_time_axis
def _(arr: DaskArray, offset: int | float = 1, base: int | float | None = None) -> DaskArray:
import dask.array as da
if offset == 1:
result = da.log1p(arr)
else:
result = da.log(arr + offset)
if base is not None:
result = result / np.log(base)
return result
[docs]
@use_ehrdata(deprecated_after="1.0.0")
def log_norm(
edata: EHRData | AnnData,
vars: str | Sequence[str] | None = None,
base: int | float | None = None,
offset: int | float = 1,
layer: str | None = None,
copy: bool = False,
) -> EHRData | AnnData | None:
r"""Apply log normalization.
Computes :math:`x = \\log(x + offset)`, where :math:`log` denotes the natural logarithm
unless a different base is given and the default :math:`offset` is :math:`1`.
Supports both 2D and 3D data:
- 2D data: Standard normalization across observations
- 3D data: Applied to all elements across samples and timestamps
Args:
edata: Central data object.
vars: List of the names of the numeric variables to normalize.
If None all numeric variables will be normalized.
base: Numeric base for logarithm. If None the natural logarithm is used.
offset: Offset added to values before computing the logarithm.
layer: The layer to normalize.
copy: Whether to return a copy or act in place.
Returns:
`None` if `copy=False` and modifies the passed edata, else returns an updated object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"].
Examples:
>>> import ehrdata as ed
>>> import ehrapy as ep
>>> import numpy as np
>>> edata = ed.dt.physionet2012(layer="tem_data")
>>> ep.pp.offset_negative_values(edata, layer="tem_data")
>>> np.nanmax(edata.layers["tem_data"])
36400.0
>>> ep.pp.log_norm(edata, layer="tem_data")
>>> np.nanmax(edata.layers["tem_data"])
10.502379
"""
if copy:
edata = edata.copy()
if FEATURE_TYPE_KEY not in edata.var.columns:
ed.infer_feature_types(edata, layer=layer, output=None)
if isinstance(vars, str):
vars = [vars]
if vars is None:
vars = edata.var_names[edata.var[FEATURE_TYPE_KEY] == NUMERIC_TAG].tolist()
else:
numeric_vars = edata.var_names[edata.var[FEATURE_TYPE_KEY] == NUMERIC_TAG].tolist()
if not set(vars) <= set(numeric_vars):
raise ValueError("Some selected vars are not numeric")
X = edata.X if layer is None else edata.layers[layer]
if vars:
var_indices = edata.var_names.get_indexer(vars)
check_data = X[:, var_indices] if X.ndim == 2 else X[:, var_indices, :]
else:
check_data = X
offset_tmp_applied = check_data + offset
if np.any(offset_tmp_applied < 0):
data_type = f"Layer '{layer}'" if layer else "Matrix X"
raise ValueError(
f"{data_type} contains negative values. "
"Undefined behavior for log normalization. "
"Please specify a higher offset to this function "
"or offset negative values with ep.pp.offset_negative_values()."
)
if vars:
var_indices = edata.var_names.get_indexer(vars)
var_values = X[:, var_indices] if X.ndim == 2 else X[:, var_indices, :]
transformed_values = _log_norm_function(var_values, offset=offset, base=base)
if layer is None:
edata.X[:, var_indices] = transformed_values
else:
if X.ndim == 3:
edata.layers[layer][:, var_indices, :] = transformed_values
else:
edata.layers[layer][:, var_indices] = transformed_values
else:
transformed_values = _log_norm_function(X, offset=offset, base=base)
if layer is None:
edata.X = transformed_values
else:
edata.layers[layer] = transformed_values
_record_norm(edata, vars, "log")
return edata if copy else None
def _record_norm(edata: EHRData | AnnData, vars: Sequence[str], method: str) -> None:
if "normalization" in edata.uns:
norm_record = edata.uns["normalization"]
else:
norm_record = {}
for var in vars:
if var in norm_record.keys():
norm_record[var].append(method)
else:
norm_record[var] = [method]
edata.uns["normalization"] = norm_record
return None
[docs]
@use_ehrdata(deprecated_after="1.0.0")
def offset_negative_values(edata: EHRData | AnnData, layer: str = None, copy: bool = False) -> EHRData | AnnData | None:
"""Offsets negative values into positive ones with the lowest negative value becoming 0.
This is primarily used to enable the usage of functions such as log_norm that
do not allow negative values for mathematical or technical reasons.
Supports both 2D and 3D data:
- 2D data: Standard offset across observations
- 3D data: Applied to all elements across samples and timestamps
Args:
edata: Central data object.
layer: The layer to offset.
copy: Whether to return a modified copy of the data object.
Returns:
`None` if `copy=False` and modifies the passed edata, else returns an updated object.
Examples:
>>> import ehrdata as ed
>>> import ehrapy as ep
>>> import numpy as np
>>> edata = ed.dt.physionet2012(layer="tem_data")
>>> np.nanmin(edata.layers["tem_data"])
-17.8
>>> ep.pp.offset_negative_values(edata, layer="tem_data")
>>> np.nanmin(edata.layers["tem_data"])
0.0
"""
if copy:
edata = edata.copy()
X = edata.X if layer is None else edata.layers[layer]
minimum = np.nanmin(X)
if minimum < 0:
np.add(X, np.abs(minimum), out=X)
return edata if copy else None