from __future__ import annotations
import random
from collections import OrderedDict
from string import ascii_letters
from typing import TYPE_CHECKING, Any, NamedTuple
import numpy as np
import pandas as pd
from anndata import AnnData, concat
from lamin_utils import logger
from scanpy.get import obs_df, rank_genes_groups_df, var_df
from scipy import sparse
from scipy.sparse import issparse
from ehrapy.anndata import check_feature_types
from ehrapy.anndata._constants import FEATURE_TYPE_KEY, NUMERIC_TAG
if TYPE_CHECKING:
from collections.abc import Collection, Iterable, Sequence
class BaseDataframes(NamedTuple):
obs: pd.DataFrame
df: pd.DataFrame
[docs]
def df_to_anndata(
df: pd.DataFrame, columns_obs_only: list[str] | None = None, index_column: str | None = None
) -> AnnData:
"""Transform a given Pandas DataFrame into an AnnData object.
Note that columns containing boolean values (either 0/1 or T(t)rue/F(f)alse)
will be stored as boolean columns whereas the other non-numerical columns will be stored as categorical values.
Args:
df: The pandas dataframe to be transformed.
columns_obs_only: An optional list of column names that should belong to obs only and not X.
index_column: The index column of obs. This can be either a column name (or its numerical index in the DataFrame) or the index of the dataframe.
Returns:
An AnnData object created from the given Pandas DataFrame.
Examples:
>>> import ehrapy as ep
>>> import pandas as pd
>>> df = pd.DataFrame(
... {
... "patient_id": ["0", "1", "2", "3", "4"],
... "age": [65, 72, 58, 78, 82],
... "sex": ["M", "F", "F", "M", "F"],
... }
... )
>>> adata = ep.ad.df_to_anndata(df, index_column="patient_id")
"""
# Check and handle the overlap of index_column in columns_obs_only
if index_column is not None:
if isinstance(index_column, int):
if index_column >= len(df.columns):
raise IndexError("index_column integer index is out of bounds.")
index_column = df.columns[index_column]
if not df.index.name or df.index.name != index_column:
if index_column in df.columns:
df.set_index(index_column, inplace=True)
else:
raise ValueError(f"Column {index_column} not found in DataFrame.")
# Now handle columns_obs_only with consideration of the new index
if columns_obs_only:
if index_column in columns_obs_only:
columns_obs_only.remove(index_column)
missing_cols = [col for col in columns_obs_only if col not in df.columns]
if missing_cols:
raise ValueError(f"Columns {missing_cols} specified in columns_obs_only are not in the DataFrame.")
obs = df.loc[:, columns_obs_only].copy()
df.drop(columns=columns_obs_only, inplace=True, errors="ignore")
else:
obs = pd.DataFrame(index=df.index)
for col in obs.columns:
if obs[col].dtype == "bool":
obs[col] = obs[col].astype(bool)
elif obs[col].dtype == "object":
obs[col] = obs[col].astype("category")
# Prepare the AnnData object
X = df.to_numpy(copy=True)
obs.index = obs.index.astype(str)
var = pd.DataFrame(index=df.columns)
var.index = var.index.astype(str)
uns = OrderedDict() # type: ignore
# Handle dtype of X based on presence of numerical columns only
all_numeric = df.select_dtypes(include=[np.number]).shape[1] == df.shape[1]
X = X.astype(np.float32 if all_numeric else object)
adata = AnnData(X=X, obs=obs, var=var, uns=uns, layers={"original": X.copy()})
adata.obs_names = adata.obs_names.astype(str)
adata.var_names = adata.var_names.astype(str)
return adata
[docs]
def anndata_to_df(
adata: AnnData,
layer: str = None,
obs_cols: Iterable[str] | str | None = None,
var_cols: Iterable[str] | str | None = None,
) -> pd.DataFrame:
"""Transform an AnnData object to a Pandas DataFrame.
Args:
adata: The AnnData object to be transformed into a pandas DataFrame
layer: The layer to access the values of. If not specified, it uses the `X` matrix.
obs_cols: The columns of `obs` to add to the DataFrame.
var_cols: The columns of `var` to fetch values from.
Returns:
The AnnData object as a pandas DataFrame
Examples:
>>> import ehrapy as ep
>>> adata = ep.dt.mimic_2(encoded=True)
>>> df = ep.ad.anndata_to_df(adata)
"""
if layer is not None:
X = adata.layers[layer]
else:
X = adata.X
if issparse(X): # pragma: no cover
X = X.toarray()
df = pd.DataFrame(X, columns=list(adata.var_names))
if obs_cols:
if len(adata.obs.columns) == 0:
raise ValueError("Cannot slice columns from empty obs!")
if isinstance(obs_cols, str):
obs_cols = list(obs_cols)
if isinstance(obs_cols, list): # pragma: no cover
obs_slice = adata.obs[obs_cols]
# reset index needed since we slice all or at least some columns from obs DataFrame
obs_slice = obs_slice.reset_index(drop=True)
df = pd.concat([df, obs_slice], axis=1)
if var_cols:
if len(adata.var.columns) == 0:
raise ValueError("Cannot slice columns from empty var!")
if isinstance(var_cols, str):
var_cols = list(var_cols)
if isinstance(var_cols, list):
var_slice = adata.var[var_cols]
# reset index needed since we slice all or at least some columns from var DataFrame
var_slice = var_slice.reset_index(drop=True)
df = pd.concat([df, var_slice], axis=1)
return df
[docs]
def move_to_obs(adata: AnnData, to_obs: list[str] | str, copy_obs: bool = False) -> AnnData:
"""Move inplace or copy features from X to obs.
Note that columns containing boolean values (either 0/1 or True(true)/False(false))
will be stored as boolean columns whereas the other non-numerical columns will be stored as categorical.
Args:
adata: The AnnData object
to_obs: The columns to move to obs
copy_obs: The values are copied to obs (and therefore kept in X) instead of moved completely
Returns:
The original AnnData object with moved or copied columns from X to obs
Examples:
>>> import ehrapy as ep
>>> adata = ep.dt.mimic_2(encoded=True)
>>> ep.ad.move_to_obs(adata, ["age"], copy_obs=False)
"""
if isinstance(to_obs, str): # pragma: no cover
to_obs = [to_obs]
# don't allow moving encoded columns as this could lead to inconsistent data in X and obs
if any(column.startswith("ehrapycat") for column in to_obs):
raise ValueError(
"Cannot move encoded columns from X to obs. Either undo encoding or remove them from the list!"
)
if not all(elem in adata.var_names.values for elem in to_obs):
raise ValueError(
f"Columns `{[col for col in to_obs if col not in adata.var_names.values]}` are not in var_names."
)
cols_to_obs_indices = adata.var_names.isin(to_obs)
num_set = _get_var_indices_for_type(adata, NUMERIC_TAG)
var_num = list(set(to_obs) & set(num_set))
if copy_obs:
cols_to_obs = adata[:, cols_to_obs_indices].to_df()
adata.obs = adata.obs.join(cols_to_obs)
adata.obs[var_num] = adata.obs[var_num].apply(pd.to_numeric, downcast="float")
adata.obs = _cast_obs_columns(adata.obs)
else:
df = adata[:, cols_to_obs_indices].to_df()
adata._inplace_subset_var(~cols_to_obs_indices)
adata.obs = adata.obs.join(df)
adata.obs[var_num] = adata.obs[var_num].apply(pd.to_numeric, downcast="float")
adata.obs = _cast_obs_columns(adata.obs)
return adata
@check_feature_types
def _get_var_indices_for_type(adata: AnnData, tag: str) -> list[str]:
"""Get indices of columns in var for a given tag.
Args:
adata: The AnnData object
tag: The tag to search for, should be one of 'CATEGORIGAL_TAG', 'NUMERIC_TAG', 'DATE_TAG'
Returns:
List of numeric columns
"""
return adata.var_names[adata.var[FEATURE_TYPE_KEY] == tag].tolist()
[docs]
def delete_from_obs(adata: AnnData, to_delete: list[str]) -> AnnData:
"""Delete features from obs.
Args:
adata: The AnnData object
to_delete: The columns to delete from obs
Returns:
The original AnnData object with deleted columns from obs.
Examples:
>>> import ehrapy as ep
>>> adata = ep.dt.mimic_2(encoded=True)
>>> ep.ad.move_to_obs(adata, ["age"], copy_obs=True)
>>> ep.ad.delete_from_obs(adata, ["age"])
"""
if isinstance(to_delete, str): # pragma: no cover
to_delete = [to_delete]
if not all(elem in adata.obs.columns.values for elem in to_delete):
raise ValueError(
f"Columns `{[col for col in to_delete if col not in adata.obs.columns.values]}` are not in obs."
)
adata.obs = adata.obs[adata.obs.columns[~adata.obs.columns.isin(to_delete)]]
return adata
[docs]
def move_to_x(adata: AnnData, to_x: list[str] | str, copy_x: bool = False) -> AnnData:
"""Move features from obs to X inplace.
Args:
adata: The AnnData object
to_x: The columns to move to X
copy_x: The values are copied to X (and therefore kept in obs) instead of moved completely
Returns:
A new AnnData object with moved columns from obs to X. This should not be used for datetime columns currently.
Examples:
>>> import ehrapy as ep
>>> adata = ep.dt.mimic_2(encoded=True)
>>> ep.ad.move_to_obs(adata, ["age"], copy_obs=False)
>>> new_adata = ep.ad.move_to_x(adata, ["age"])
"""
if isinstance(to_x, str): # pragma: no cover
to_x = [to_x]
if not all(elem in adata.obs.columns.values for elem in to_x):
raise ValueError(f"Columns `{[col for col in to_x if col not in adata.obs.columns.values]}` are not in obs.")
cols_present_in_x = []
cols_not_in_x = []
for col in to_x:
if col in set(adata.var_names):
cols_present_in_x.append(col)
else:
cols_not_in_x.append(col)
if cols_present_in_x:
logger.warn(
f"Columns `{cols_present_in_x}` are already in X. Skipped moving `{cols_present_in_x}` to X. "
f"If you want to permanently delete these columns from obs, please use the function delete_from_obs()."
)
if cols_not_in_x:
new_adata = concat([adata, AnnData(adata.obs[cols_not_in_x])], axis=1)
if copy_x:
new_adata.obs = adata.obs
else:
new_adata.obs = adata.obs[adata.obs.columns[~adata.obs.columns.isin(cols_not_in_x)]]
# AnnData's concat discards var if they don't match in their keys, so we need to create a new var
created_var = pd.DataFrame(index=cols_not_in_x)
new_adata.var = pd.concat([adata.var, created_var], axis=0)
else:
new_adata = adata
return new_adata
def get_column_indices(adata: AnnData, col_names: str | Iterable[str]) -> list[int]:
"""Fetches the column indices in X for a given list of column names
Args:
adata: :class:`~anndata.AnnData` object.
col_names: Column names to extract the indices for.
Returns:
List of column indices.
"""
col_names = [col_names] if isinstance(col_names, str) else col_names
mask = np.isin(adata.var_names, col_names)
indices = np.where(mask)[0].tolist()
return indices
def _assert_encoded(adata: AnnData):
try:
assert np.issubdtype(adata.X.dtype, np.number)
except AssertionError:
raise NotEncodedError("The AnnData object has not yet been encoded.") from AssertionError
@check_feature_types
def get_numeric_vars(adata: AnnData) -> list[str]:
"""Fetches the column names for numeric variables in X.
Args:
adata: :class:`~anndata.AnnData` object
Returns:
List of column numeric column names
"""
_assert_encoded(adata)
return _get_var_indices_for_type(adata, NUMERIC_TAG)
def assert_numeric_vars(adata: AnnData, vars: Sequence[str]):
num_vars = get_numeric_vars(adata)
try:
assert set(vars) <= set(num_vars)
except AssertionError:
raise ValueError("Some selected vars are not numeric") from None
def set_numeric_vars(
adata: AnnData, values: np.ndarray, vars: Sequence[str] | None = None, copy: bool = False
) -> AnnData | None:
"""Sets the numeric values in given column names in X.
Args:
adata: :class:`~anndata.AnnData` object
values: Matrix containing the replacement values
vars: List of names of the numeric variables to replace. If `None` they will be detected using :func:`~ehrapy.preprocessing.get_numeric_vars`.
copy: Whether to return a copy with the normalized data.
Returns:
:class:`~anndata.AnnData` object with updated X
"""
_assert_encoded(adata)
if vars is None:
vars = get_numeric_vars(adata)
else:
assert_numeric_vars(adata, vars)
if not np.issubdtype(values.dtype, np.number):
raise TypeError(f"Values must be numeric (current dtype is {values.dtype})")
n_values = values.shape[1]
if n_values != len(vars):
raise ValueError(f"Number of values ({n_values}) does not match number of vars ({len(vars)})")
if copy:
adata = adata.copy()
vars_idx = get_column_indices(adata, vars)
adata.X[:, vars_idx] = values
return adata
def _detect_binary_columns(df: pd.DataFrame, numerical_columns: list[str]) -> list[str]:
"""Detect all columns that contain only 0 and 1 (besides NaNs).
Args:
df: The dataframe to check.
numerical_columns: All numerical columns of the dataframe.
Returns:
List of column names that are binary (containing only 0 and 1 (+NaNs))
"""
binary_columns = []
for column in numerical_columns:
# checking for float and int as well as NaNs (this is safe since checked columns are numericals only)
# only columns that contain at least one 0 and one 1 are counted as binary (or 0.0/1.0)
if df[column].isin([0.0, 1.0, np.nan, 0, 1]).all() and df[column].nunique() == 2:
binary_columns.append(column)
return binary_columns
def _cast_obs_columns(obs: pd.DataFrame) -> pd.DataFrame:
"""Cast non numerical obs columns to either category or bool.
Args:
obs: Obs of an AnnData object
Returns:
The type casted obs.
"""
# only cast non numerical columns
object_columns = list(obs.select_dtypes(exclude=["number", "category", "bool"]).columns)
# type cast each non-numerical column to either bool (if possible) or category else
obs[object_columns] = obs[object_columns].apply(
lambda obs_name: obs_name.astype("category")
if not set(pd.unique(obs_name)).issubset({False, True, np.nan})
else obs_name.astype("bool"),
axis=0,
)
return obs
def generate_anndata( # pragma: no cover
shape: tuple[int, int],
X_type=sparse.csr_matrix,
X_dtype=np.float32,
obsm_types: Collection = (sparse.csr_matrix, np.ndarray, pd.DataFrame),
varm_types: Collection = (sparse.csr_matrix, np.ndarray, pd.DataFrame),
layers_types: Collection = (sparse.csr_matrix, np.ndarray, pd.DataFrame),
include_nlp: bool = False,
) -> AnnData:
"""Generates a predefined AnnData with random values.
Args:
shape: Shape of the X matrix.
X_type: Type of the X matrix.
X_dtype: Data type of the X matrix.
obsm_types: Types of the obsm matrices.
varm_types: Types of the varm matrices.
layers_types: Types of additional layers.
include_nlp: Whether to include diseases for NLP in all of X, obs and var.
Sets the X_dtype to object by default and overwrites the passed X_dtype.
Returns:
A specified AnnData object.
Examples:
>>> import ehrapy as ep
>>> adata = ep.ad.generate_anndata((2, 2), include_nlp=True)
"""
example_diseases: list[str] = ["diabetes melitus", "breast cancer", "dementia", "pneumonia"]
M, N = shape
obs_names = pd.Index(f"patient{i}" for i in range(shape[0]))
var_names = pd.Index(f"feature{i}" for i in range(shape[1]))
def _generate_typed_df(n_values, index=None, nlp: bool = False) -> pd.DataFrame:
"""Generates a typed DataFrame with categoricals and numericals.
Args:
n_values: Number of values to generate per type.
index: Name of the index column.
nlp: Whether to include disease names.
Returns:
Pandas DataFrame with the specified number of values.
"""
letters = np.fromiter(iter(ascii_letters), "U1")
if n_values > len(letters):
letters = letters[: n_values // 2] # Make sure categories are repeated
df = pd.DataFrame(
{
"cat": pd.Categorical(np.random.choice(letters, n_values)),
"cat_ordered": pd.Categorical(np.random.choice(letters, n_values), ordered=True),
"int64": np.random.randint(-50, 50, n_values),
"float64": np.random.random(n_values),
"uint8": np.random.randint(255, size=n_values, dtype="uint8"),
},
index=index,
)
if nlp:
df["nlp"] = random.sample(example_diseases, k=n_values)
return df
obs = _generate_typed_df(M, obs_names, nlp=include_nlp)
var = _generate_typed_df(N, var_names, nlp=include_nlp)
obs.rename(columns={"cat": "obs_cat"}, inplace=True)
var.rename(columns={"cat": "var_cat"}, inplace=True)
if X_type is None:
X = None
else:
if include_nlp:
X_np_array = np.random.binomial(100, 0.005, (M, N - 1)).astype(object)
X = np.append(X_np_array, [[el] for el in random.sample(example_diseases, k=M)], axis=1)
else:
X_np_array = np.random.binomial(100, 0.005, (M, N))
X = X_type(X_np_array).astype(X_dtype)
obsm = {
"array": np.random.random((M, 50)),
"sparse": sparse.random(M, 100, format="csr"),
"df": _generate_typed_df(M, obs_names),
}
obsm = {k: v for k, v in obsm.items() if type(v) in obsm_types}
varm = {
"array": np.random.random((N, 50)),
"sparse": sparse.random(N, 100, format="csr"),
"df": _generate_typed_df(N, var_names),
}
varm = {k: v for k, v in varm.items() if type(v) in varm_types}
layers = {"array": np.random.random((M, N)), "sparse": sparse.random(M, N, format="csr")}
layers = {k: v for k, v in layers.items() if type(v) in layers_types}
obsp = {"array": np.random.random((M, M)), "sparse": sparse.random(M, M, format="csr")}
varp = {"array": np.random.random((N, N)), "sparse": sparse.random(N, N, format="csr")}
def _generate_vstr_recarray(m, n, dtype=None):
size = m * n
lengths = np.random.randint(3, 5, size)
letters = np.array(list(ascii_letters))
gen_word = lambda w: "".join(np.random.choice(letters, w))
arr = np.array([gen_word(length) for length in lengths]).reshape(m, n)
return pd.DataFrame(arr, columns=[gen_word(5) for _ in range(n)]).to_records(index=False, column_dtypes=dtype)
uns = {
"O_recarray": _generate_vstr_recarray(N, 5),
"nested": {
"scalar_str": "str",
"scalar_int": 42,
"scalar_float": 3.0,
"nested_further": {"array": np.arange(5)},
},
}
if include_nlp:
X_dtype = np.dtype(object)
adata = AnnData(
X=X,
obs=obs,
var=var,
obsm=obsm,
varm=varm,
layers=layers,
obsp=obsp,
varp=varp,
uns=uns,
)
return adata
[docs]
def get_obs_df( # pragma: no cover
adata: AnnData,
keys: Iterable[str] = (),
obsm_keys: Iterable[tuple[str, int]] = (),
*,
layer: str = None,
features: str = None,
):
"""Return values for observations in adata.
Args:
adata: AnnData object to get values from.
keys: Keys from either `.var_names`, `.var[gene_symbols]`, or `.obs.columns`.
obsm_keys: Tuple of `(key from obsm, column index of obsm[key])`.
layer: Layer of `adata`.
features: Column of `adata.var` to search for `keys` in.
Returns:
A dataframe with `adata.obs_names` as index, and values specified by `keys` and `obsm_keys`.
Examples:
>>> import ehrapy as ep
>>> adata = ep.dt.mimic_2(encoded=True)
>>> ages = ep.ad.get_obs_df(adata, keys=["age"])
"""
return obs_df(adata=adata, keys=keys, obsm_keys=obsm_keys, layer=layer, gene_symbols=features)
[docs]
def get_var_df( # pragma: no cover
adata: AnnData,
keys: Iterable[str] = (),
varm_keys: Iterable[tuple[str, int]] = (),
*,
layer: str = None,
):
"""Return values for observations in adata.
Args:
adata: AnnData object to get values from.
keys: Keys from either `.obs_names`, or `.var.columns`.
varm_keys: Tuple of `(key from varm, column index of varm[key])`.
layer: Layer of `adata`.
Returns:
A dataframe with `adata.var_names` as index, and values specified by `keys` and `varm_keys`.
Examples:
>>> import ehrapy as ep
>>> adata = ep.dt.mimic_2(encoded=True)
>>> four_patients = ep.ad.get_var_df(adata, keys=["0", "1", "2", "3"])
"""
return var_df(adata=adata, keys=keys, varm_keys=varm_keys, layer=layer)
[docs]
def get_rank_features_df(
adata: AnnData,
group: str | Iterable[str],
*,
key: str = "rank_features_groups",
pval_cutoff: float | None = None,
log2fc_min: float | None = None,
log2fc_max: float | None = None,
features: str | None = None,
):
""":func:`ehrapy.tl.rank_features_groups` results in the form of a :class:`~pandas.DataFrame`.
Args:
adata: AnnData object to get values from.
group: Which group (as in :func:`ehrapy.tl.rank_genes_groups`'s `groupby` argument)
to return results from. Can be a list. All groups are returned if groups is `None`.
key: Key differential groups were stored under.
pval_cutoff: Return only adjusted p-values below the cutoff.
log2fc_min: Minimum logfc to return.
log2fc_max: Maximum logfc to return.
features: Column name in `.var` DataFrame that stores gene symbols.
Specifying this will add that column to the returned DataFrame.
Returns:
A Pandas DataFrame of all rank genes groups results.
Examples:
>>> import ehrapy as ep
>>> adata = ep.dt.mimic_2(encoded=True)
>>> ep.tl.rank_features_groups(adata, "service_unit")
>>> df = ep.ad.get_rank_features_df(adata, group="FICU")
"""
return rank_genes_groups_df(
adata=adata,
group=group,
key=key,
pval_cutoff=pval_cutoff,
log2fc_min=log2fc_min,
log2fc_max=log2fc_max,
gene_symbols=features,
)
class NotEncodedError(AssertionError):
pass
def _are_ndarrays_equal(arr1: np.ndarray, arr2: np.ndarray) -> np.bool_:
"""Check if two arrays are equal member-wise.
Note: Two NaN are considered equal.
Args:
arr1: First array to compare
arr2: Second array to compare
Returns:
True if the two arrays are equal member-wise
"""
return np.all(np.equal(arr1, arr2, dtype=object) | ((arr1 != arr1) & (arr2 != arr2)))
def _is_val_missing(data: np.ndarray) -> np.ndarray[Any, np.dtype[np.bool_]]:
"""Check if values in a AnnData matrix are missing.
Args:
data: The AnnData matrix to check
Returns:
An array of bool representing the missingness of the original data, with the same shape
"""
return np.isin(data, [None, ""]) | (data != data)
def _to_dense_matrix(adata: AnnData, layer: str | None = None) -> np.ndarray: # pragma: no cover
"""Extract a layer from an AnnData object and convert it to a dense matrix if required.
Args:
adata: The AnnData where to extract the layer from.
layer: Name of the layer to extract. If omitted, X is considered.
Returns:
The layer as a dense matrix. If a conversion was required, this function returns a copy of the original layer,
othersize this function returns a reference.
"""
from scipy.sparse import issparse
if layer is None:
return adata.X.toarray() if issparse(adata.X) else adata.X
else:
return adata.layers[layer].toarray() if issparse(adata.layers[layer]) else adata.layers[layer]