Source code for ehrapy.anndata.anndata_ext

from __future__ import annotations

import random
from collections import OrderedDict
from string import ascii_letters
from typing import Collection, Iterable, NamedTuple, Sequence

import numpy as np
import pandas as pd
from anndata import AnnData, concat
from rich import print
from rich.text import Text
from rich.tree import Tree
from scanpy.get import obs_df, rank_genes_groups_df, var_df
from scipy import sparse
from scipy.sparse import issparse

from ehrapy import logging as logg


class BaseDataframes(NamedTuple):
    obs: pd.DataFrame
    df: pd.DataFrame


[docs]def df_to_anndata( df: pd.DataFrame, columns_obs_only: list[str] | None = None, index_column: str | None = None ) -> AnnData: """Transform a given pandas dataframe into an AnnData object. Note that columns containing boolean values (either 0/1 or T(t)rue/F(f)alse) will be stored as boolean columns whereas the other non numerical columns will be stored as categorical values. Args: df: The pandas dataframe to be transformed columns_obs_only: An optional list of column names that should belong to obs only and not X index_column: The index column of obs. This can be either a column name (or its numerical index in the dataframe) or the index of the dataframe Returns: An AnnData object created from the given pandas dataframe Examples: >>> import ehrapy as ep >>> import pandas as pd >>> df = pd.DataFrame( ... { ... "patient_id": ["0", "1", "2", "3", "4"], ... "age": [65, 72, 58, 78, 82], ... "sex": ["M", "F", "F", "M", "F"], ... } ... ) >>> adata = ep.df_to_anndata(df, index_column="patient_id") """ # allow index 0 if index_column is not None: df_columns = list(df.columns) # if the index of the dataframe is the index_column leave it as it is if index_column == df.index.name: pass # if index column is either numerical or not the actual index name, search for index_column in the columns elif isinstance(index_column, int) or index_column != df.index.name: if isinstance(index_column, str) and index_column in df_columns: df = df.set_index(index_column) # also ensure that the index is in range elif isinstance(index_column, int) and index_column < len(df_columns): df = df.set_index(df_columns[index_column]) else: raise ValueError(f"Did not find column {index_column} in neither index or columns!") # index_column is neither in the index or in the columns or passed as some value that could not be understood else: # pragma: no cover raise ValueError(f"Did not find column {index_column} in neither index or columns!") # move columns from the input dataframe to obs if columns_obs_only: try: obs = df[columns_obs_only].copy() obs = obs.set_index(df.index.map(str)) df = df.drop(columns_obs_only, axis=1) except KeyError as e: raise ValueError( "One or more column names passed to column_obs_only were not found in the input data. " "Are the column names spelled correctly?" ) from e else: obs = pd.DataFrame(index=df.index.map(str)) logg.info("Added all columns to `obs`.") dataframes = BaseDataframes(obs, df) numerical_columns = list(dataframes.df.select_dtypes("number").columns) # if data is numerical only, short-circuit AnnData creation to have float dtype instead of object all_num = True if len(numerical_columns) == len(list(dataframes.df.columns)) else False X = dataframes.df.to_numpy(copy=True) # initializing an OrderedDict with a non-empty dict might not be intended, # see: https://stackoverflow.com/questions/25480089/right-way-to-initialize-an-ordereddict-using-its-constructor-such-that-it-retain/25480206 uns = OrderedDict() # store all numerical/non-numerical columns that are not obs only binary_columns = _detect_binary_columns(df, numerical_columns) uns["numerical_columns"] = list(set(numerical_columns) | set(binary_columns)) uns["non_numerical_columns"] = list(set(dataframes.df.columns) ^ set(uns["numerical_columns"])) # cast non numerical obs only columns to category or bool dtype, which is needed for writing to .h5ad files adata = AnnData( X=X, obs=_cast_obs_columns(dataframes.obs), var=pd.DataFrame(index=list(dataframes.df.columns)), dtype="float32" if all_num else "object", layers={"original": X.copy()}, uns=uns, ) logg.info( f"Transformed passed dataframe into an AnnData object with n_obs x n_vars = `{adata.n_obs}` x `{adata.n_vars}`." ) return adata
[docs]def anndata_to_df( adata: AnnData, layer: str = None, obs_cols: list[str] | str | None = None, var_cols: list[str] | str | None = None ) -> pd.DataFrame: """Transform an AnnData object to a pandas dataframe. Args: adata: The AnnData object to be transformed into a pandas Dataframe layer: The layer to use for X. obs_cols: List of obs columns to add to X. var_cols: List of var columns to add to X. Returns: The AnnData object as a pandas Dataframe Examples: >>> import ehrapy as ep >>> adata = ep.dt.mimic_2(encoded=True) >>> df = ep.ad.anndata_to_df(adata) """ if layer is not None: X = adata.layers[layer] else: X = adata.X if issparse(X): # pragma: no cover X = X.toarray() df = pd.DataFrame(X, columns=list(adata.var_names)) if obs_cols: if len(adata.obs.columns) == 0: raise ValueError("Cannot slice columns from empty obs!") if isinstance(obs_cols, str): obs_cols = list(obs_cols) if isinstance(obs_cols, list): # pragma: no cover obs_slice = adata.obs[obs_cols] # reset index needed since we slice all or at least some columns from obs DataFrame obs_slice = obs_slice.reset_index(drop=True) df = pd.concat([df, obs_slice], axis=1) logg.info(f"Added `{obs_cols}` columns to `X`.") if var_cols: if len(adata.var.columns) == 0: raise ValueError("Cannot slice columns from empty var!") if isinstance(var_cols, str): var_cols = list(var_cols) if isinstance(var_cols, list): var_slice = adata.var[var_cols] # reset index needed since we slice all or at least some columns from var DataFrame var_slice = var_slice.reset_index(drop=True) df = pd.concat([df, var_slice], axis=1) return df
[docs]def move_to_obs(adata: AnnData, to_obs: list[str] | str, copy_obs: bool = False) -> AnnData: """Move inplace or copy features from X to obs. Note that columns containing boolean values (either 0/1 or True( true)/False(false)) will be stored as boolean columns whereas the other non numerical columns will be stored as category. Args: adata: The AnnData object to_obs: The columns to move to obs copy_obs: The values are copied to obs (and therefore kept in X) instead of moved completely Returns: The original AnnData object with moved or copied columns from X to obs Examples: >>> import ehrapy as ep >>> adata = ep.dt.mimic_2(encoded=True) >>> ep.ad.move_to_obs(adata, ['age'], copy_obs=False) """ if isinstance(to_obs, str): # pragma: no cover to_obs = [to_obs] # don't allow moving encoded columns as this could lead to inconsistent data in X and obs if any(column.startswith("ehrapycat") for column in to_obs): raise ValueError( "Cannot move encoded columns from X to obs. Either undo encoding or remove them from the list!" ) if not all(elem in adata.var_names.values for elem in to_obs): raise ValueError( f"Columns `{[col for col in to_obs if col not in adata.var_names.values]}` are not in var_names." ) if copy_obs: cols_to_obs_indices = adata.var_names.isin(to_obs) cols_to_obs = adata[:, cols_to_obs_indices].to_df() adata.obs = adata.obs.join(cols_to_obs) num_set = set(adata.uns["numerical_columns"].copy()) non_num_set = set(adata.uns["non_numerical_columns"].copy()) var_num = [] var_non_num = [] for var in to_obs: if var in num_set: var_num.append(var) elif var in non_num_set: var_non_num.append(var) adata.obs[var_num] = adata.obs[var_num].apply(pd.to_numeric, errors="ignore", downcast="float") adata.obs = _cast_obs_columns(adata.obs) else: cols_to_obs_indices = adata.var_names.isin(to_obs) df = adata[:, cols_to_obs_indices].to_df() adata._inplace_subset_var(~cols_to_obs_indices) adata.obs = adata.obs.join(df) updated_num_uns, updated_non_num_uns, num_var = _update_uns(adata, to_obs) adata.obs[num_var] = adata.obs[num_var].apply(pd.to_numeric, errors="ignore", downcast="float") adata.obs = _cast_obs_columns(adata.obs) adata.uns["numerical_columns"] = updated_num_uns adata.uns["non_numerical_columns"] = updated_non_num_uns logg.info(f"Added `{to_obs}` to `obs`.") return adata
[docs]def delete_from_obs(adata: AnnData, to_delete: list[str]) -> AnnData: """Delete features from obs. Args: adata: The AnnData object to_delete: The columns to delete from obs Returns: The original AnnData object with deleted columns from obs. Examples: >>> import ehrapy as ep >>> adata = ep.dt.mimic_2(encoded=True) >>> ep.ad.move_to_obs(adata, ['age'], copy_obs=True) >>> ep.ad.delete_from_obs(adata, ['age']) """ if isinstance(to_delete, str): # pragma: no cover to_delete = [to_delete] if not all(elem in adata.obs.columns.values for elem in to_delete): raise ValueError( f"Columns `{[col for col in to_delete if col not in adata.obs.columns.values]}` are not in obs." ) adata.obs = adata.obs[adata.obs.columns[~adata.obs.columns.isin(to_delete)]] logg.info(f"Removed `{to_delete}` from `obs`.") return adata
[docs]def move_to_x(adata: AnnData, to_x: list[str] | str) -> AnnData: """Move features from obs to X inplace. Args: adata: The AnnData object to_x: The columns to move to X copy: Whether to return a copy or not Returns: A new AnnData object with moved columns from obs to X. This should not be used for datetime columns currently. Examples: >>> import ehrapy as ep >>> adata = ep.dt.mimic_2(encoded=True) >>> ep.ad.move_to_obs(adata, ['age'], copy_obs=False) >>> new_adata = ep.ad.move_to_x(adata, ['age']) """ if isinstance(to_x, str): # pragma: no cover to_x = [to_x] if not all(elem in adata.obs.columns.values for elem in to_x): raise ValueError(f"Columns `{[col for col in to_x if col not in adata.obs.columns.values]}` are not in obs.") cols_present_in_x = [] cols_not_in_x = [] for col in to_x: if col in set(adata.var_names): cols_present_in_x.append(col) else: cols_not_in_x.append(col) if cols_present_in_x: logg.info( f"Columns `{cols_present_in_x}` are already in X. Skipped moving `{cols_present_in_x}` to X. " f"If you want to permanently delete these columns from obs, please use the function delete_from_obs()." ) if cols_not_in_x: new_adata = concat([adata, AnnData(adata.obs[cols_not_in_x], dtype="object")], axis=1) new_adata.obs = adata.obs[adata.obs.columns[~adata.obs.columns.isin(cols_not_in_x)]] # update uns (copy maybe: could be a costly operation but reduces reference cycles) # users might save those as separate AnnData object and this could be unexpected behaviour if we dont copy num_columns_moved, non_num_columns_moved, _ = _update_uns(adata, cols_not_in_x, True) new_adata.uns["numerical_columns"] = adata.uns["numerical_columns"] + num_columns_moved new_adata.uns["non_numerical_columns"] = adata.uns["non_numerical_columns"] + non_num_columns_moved logg.info(f"Added `{cols_not_in_x}` features to `X`.") else: new_adata = adata return new_adata
def get_column_indices(adata: AnnData, col_names: str | Sequence[str]) -> list[int]: """Fetches the column indices in X for a given list of column names Args: adata: :class:`~anndata.AnnData` object col_names: Column names to extract the indices for Returns: Set of column indices Examples: >>> import ehrapy as ep >>> adata = ep.dt.mimic_2(encoded=True) >>> ep.ad.get_column_indices(adata, ['age', 'gender_num', 'bmi']) """ if isinstance(col_names, str): # pragma: no cover col_names = [col_names] indices = list() for idx, col in enumerate(adata.var_names): if col in col_names: indices.append(idx) return indices def _get_column_values(adata: AnnData, indices: int | list[int]) -> np.ndarray: """Fetches the column values for a specific index from X Args: adata: :class:`~anndata.AnnData` object indices: The index to extract the values for Returns: :class:`~numpy.ndarray` object containing the column values """ return np.take(adata.X, indices, axis=1)
[docs]def type_overview(data: AnnData, sort_by: str | None = None, sort_reversed: bool = False) -> None: # pragma: no cover """Prints the current state of an :class:`~anndata.AnnData` object in a tree format. Output could be printed in sorted format by using one of `dtype`, `order`, `num_cats` or `None`, which sorts by data type, lexicographical order, number of unique values (excluding NaN's) and unsorted respectively. Note that sorting by `num_cats` only affects encoded variables currently and will display unencoded vars unsorted. Args: data: :class:`~anndata.AnnData` object to display sort_by: How the tree output should be sorted. One of `dtype`, `order`, `num_cats` or None (Defaults to None -> unsorted) sort_reversed: Whether to sort in reversed order or not Examples: >>> import ehrapy as ep >>> adata = ep.dt.mimic_2(encoded=True) >>> ep.ad.type_overview(adata) """ if isinstance(data, AnnData): _adata_type_overview(data, sort_by, sort_reversed) else: raise ValueError(f"Unable to present object of type {type(data)}. Can only display AnnData objects!")
def _adata_type_overview( adata: AnnData, sort_by: str | None = None, sort_reversed: bool = False ) -> None: # pragma: no cover """Display the :class:`~anndata.AnnData object in its current state (encoded and unencoded variables, obs) Args: adata: The :class:`~anndata.AnnData object to display sort_by: Whether to sort output or not sort_reversed: Whether to sort output in reversed order or not """ tree = Tree( f"[b green]Variable names for AnnData object with {len(adata.obs_names)} obs and {len(adata.var_names)} vars", guide_style="underline2 bright_blue", ) if "var_to_encoding" in adata.uns.keys(): original_values = adata.uns["original_values_categoricals"] branch = tree.add("🔐 Encoded variables", style="b green") dtype_dict = _infer_dtype_per_encoded_var(list(original_values.keys()), original_values) # sort encoded vars by lexicographical order of original values if sort_by == "order": encoded_list = sorted(original_values.keys(), reverse=sort_reversed) for categorical in encoded_list: branch.add( f"[blue]{categorical} -> {dtype_dict[categorical][1]} categories;" f" [green]{adata.uns['var_to_encoding'][categorical].replace('encoding', '').replace('_', ' ').strip()} [blue]encoded; [green]original data type: [blue]{dtype_dict[categorical][0]}" ) # sort encoded vars by data type of the original values or the number of unique values in original data (excluding NaNs) elif sort_by == "dtype" or sort_by == "num_cats": sorted_by_type = { var: _type for var, _type in sorted( dtype_dict.items(), key=lambda item: item[1][0 if sort_by == "dtype" else 1], reverse=sort_reversed ) } for categorical in sorted_by_type: branch.add( f"[blue]{categorical} -> {sorted_by_type[categorical][1]} categories;" f" [green]{adata.uns['var_to_encoding'][categorical].replace('encoding', '').replace('_', ' ').strip()} [blue]encoded; [green]original data type: [blue]{sorted_by_type[categorical][0]}" ) # display in unsorted order else: encoded_list = original_values.keys() for categorical in encoded_list: branch.add( f"[blue]{categorical} -> {dtype_dict[categorical][1]} categories;" f" [green]{adata.uns['var_to_encoding'][categorical].replace('encoding', '').replace('_', ' ').strip()} [blue]encoded; [green]original data type: [blue]{dtype_dict[categorical][0]}" ) branch_num = tree.add(Text("🔓 Unencoded variables"), style="b green") if sort_by == "order": var_names = sorted(list(adata.var_names.values), reverse=sort_reversed) _sort_by_order_or_none(adata, branch_num, var_names) elif sort_by == "dtype": var_names = list(adata.var_names.values) _sort_by_type(adata, branch_num, var_names, sort_reversed) else: var_names = list(adata.var_names.values) _sort_by_order_or_none(adata, branch_num, var_names) if sort_by: logg.info( "Displaying AnnData object in sorted mode. Note that this might not be the exact same order of the variables in X or var are stored!" ) print(tree) def _sort_by_order_or_none(adata: AnnData, branch, var_names: list[str]): """Add branches to tree for sorting by order or unsorted.""" var_names_val = list(adata.var_names.values) for other_vars in var_names: if not other_vars.startswith("ehrapycat"): idx = var_names_val.index(other_vars) unique_categoricals = pd.unique(adata.X[:, idx : idx + 1].flatten()) data_type = pd.api.types.infer_dtype(unique_categoricals) branch.add(f"[blue]{other_vars} -> [green]data type: [blue]{data_type}") def _sort_by_type(adata: AnnData, branch, var_names: list[str], sort_reversed: bool): """Sort tree output by datatype""" tmp_dict = {} var_names_val = list(adata.var_names.values) for other_vars in var_names: if not other_vars.startswith("ehrapycat"): idx = var_names_val.index(other_vars) unique_categoricals = pd.unique(adata.X[:, idx : idx + 1].flatten()) data_type = pd.api.types.infer_dtype(unique_categoricals) tmp_dict[other_vars] = data_type sorted_by_type = { var: _type for var, _type in sorted(tmp_dict.items(), key=lambda item: item[1], reverse=sort_reversed) } for var in sorted_by_type: branch.add(f"[blue]{var} -> [green]data type: [blue]{sorted_by_type[var]}") def _infer_dtype_per_encoded_var(encoded_list: list[str], original_values) -> dict[str, tuple[str, int]]: """Infer dtype of each encoded varibale of an AnnData object.""" dtype_dict = {} for categorical in encoded_list: unique_categoricals = pd.unique(original_values[categorical].flatten()) categorical_type = pd.api.types.infer_dtype(unique_categoricals) num_unique_values = pd.DataFrame(unique_categoricals).dropna()[0].nunique() dtype_dict[categorical] = (categorical_type, num_unique_values) return dtype_dict def _single_quote_string(name: str) -> str: # pragma: no cover """Single quote a string to inject it into f-strings, since backslashes cannot be in double f-strings.""" return f"'{name}'" def _assert_encoded(adata: AnnData): try: assert np.issubdtype(adata.X.dtype, np.number) except AssertionError: raise NotEncodedError("The AnnData object has not yet been encoded.") from AssertionError def get_numeric_vars(adata: AnnData) -> list[str]: """Fetches the column names for numeric variables in X. Args: adata: :class:`~anndata.AnnData` object Returns: List of column numeric column names """ _assert_encoded(adata) if "numerical_columns" not in adata.uns_keys(): return list(adata.var_names.values) else: return adata.uns["numerical_columns"] def assert_numeric_vars(adata: AnnData, vars: Sequence[str]): num_vars = get_numeric_vars(adata) try: assert set(vars) <= set(num_vars) except AssertionError: raise ValueError("Some selected vars are not numeric") def set_numeric_vars( adata: AnnData, values: np.ndarray, vars: Sequence[str] | None = None, copy: bool = False ) -> AnnData | None: """Sets the numeric values in given column names in X. Args: adata: :class:`~anndata.AnnData` object values: Matrix containing the replacement values vars: List of names of the numeric variables to replace. If `None` they will be detected using :func:`~ehrapy.preprocessing.get_numeric_vars`. copy: Whether to return a copy with the normalized data. Returns: :class:`~anndata.AnnData` object with updated X """ _assert_encoded(adata) if vars is None: vars = get_numeric_vars(adata) else: assert_numeric_vars(adata, vars) if not np.issubdtype(values.dtype, np.number): raise TypeError(f"Values must be numeric (current dtype is {values.dtype})") n_values = values.shape[1] if n_values != len(vars): raise ValueError(f"Number of values ({n_values}) does not match number of vars ({len(vars)})") if copy: adata = adata.copy() vars_idx = get_column_indices(adata, vars) for i in range(n_values): adata.X[:, vars_idx[i]] = values[:, i] logg.info(f"Values in columns {vars} were replaced by {values}.") return adata def _update_uns( adata: AnnData, moved_columns: list[str], to_x: bool = False ) -> tuple[list[str], list[str], list[str] | None]: """Updates .uns of adata to reflect the changes made on the object by moving columns from X to obs or vice versa. 1.) Moving `col1` from `X` to `obs`: `col1` is either numerical or non_numerical, so delete it from the corresponding entry in `uns` 2.) Moving `col1` from `obs` to `X`: `col1` is either numerical or non_numerical, so add it to the corresponding entry in `uns` Args: adata: class:`~anndata.AnnData` object moved_columns: List of column names to be moved to_x: Whether to move from `obs` to `X` or vice versa Returns: :class:`~anndata.AnnData` object with updated .uns """ moved_columns_set = set(moved_columns) if not to_x: # moving from `X` to `obs`, delete it from the corresponding entry in `uns`. num_set = set(adata.uns["numerical_columns"].copy()) non_num_set = set(adata.uns["non_numerical_columns"].copy()) var_num = [] for var in moved_columns_set: if var in num_set: var_num.append(var) num_set -= {var} elif var in non_num_set: non_num_set -= {var} logg.info(f"Added `{moved_columns}` columns to `X`.") return list(num_set), list(non_num_set), var_num else: # moving from `obs` to `X`, add it to the corresponding entry in `uns`. all_moved_non_num_columns = moved_columns_set & set(adata.obs.select_dtypes(exclude="number").columns) all_moved_num_columns = list(moved_columns_set ^ all_moved_non_num_columns) logg.info(f"Added `{moved_columns}` columns to `obs`.") return all_moved_num_columns, list(all_moved_non_num_columns), None def _detect_binary_columns(df: pd.DataFrame, numerical_columns: list[str]) -> list[str]: """Detect all columns that contain only 0 and 1 (besides NaNs). Args: df: The dataframe to check. numerical_columns: All numerical columns of the dataframe. Returns: List of column names that are binary (containing only 0 and 1 (+NaNs)) """ binary_columns = [] for column in numerical_columns: # checking for float and int as well as NaNs (this is safe since checked columns are numericals only) # only columns that contain at least one 0 and one 1 are counted as binary (or 0.0/1.0) if df[column].isin([0.0, 1.0, np.NaN, 0, 1]).all() and df[column].nunique() == 2: binary_columns.append(column) return binary_columns def _cast_obs_columns(obs: pd.DataFrame) -> pd.DataFrame: """Cast non numerical obs columns to either category or bool. Args: obs: Obs of an AnnData object Returns: The type casted obs. """ # only cast non numerical columns object_columns = list(obs.select_dtypes(exclude=["number", "category", "bool"]).columns) # type cast each non numerical column to either bool (if possible) or category else obs[object_columns] = obs[object_columns].apply( lambda obs_name: obs_name.astype("category") if not set(pd.unique(obs_name)).issubset({False, True, np.NaN}) else obs_name.astype("bool"), axis=0, ) return obs def generate_anndata( # pragma: no cover shape: tuple[int, int], X_type=sparse.csr_matrix, X_dtype=np.float32, obsm_types: Collection = (sparse.csr_matrix, np.ndarray, pd.DataFrame), varm_types: Collection = (sparse.csr_matrix, np.ndarray, pd.DataFrame), layers_types: Collection = (sparse.csr_matrix, np.ndarray, pd.DataFrame), include_nlp: bool = False, ) -> AnnData: """Generates a predefined AnnData with random values. Args: shape: Shape of the X matrix. X_type: Type of the X matrix. X_dtype: Data type of the X matrix. obsm_types: Types of the obsm matrices. varm_types: Types of the varm matrices. layers_types: Types of additional layers. include_nlp: Whether to include diseases for NLP in all of X, obs and var. Sets the X_dtype to object by default and overwrites the passed X_dtype. Returns: A specified AnnData object. Examples: >>> import ehrapy as ep >>> adata = ep.ad.generate_anndata((2, 2), include_nlp=True) """ example_diseases: list[str] = ["diabetes melitus", "breast cancer", "dementia", "pneumonia"] M, N = shape obs_names = pd.Index(f"patient{i}" for i in range(shape[0])) var_names = pd.Index(f"feature{i}" for i in range(shape[1])) def _generate_typed_df(n_values, index=None, nlp: bool = False) -> pd.DataFrame: """Generates a typed DataFrame with categoricals and numericals. Args: n_values: Number of values to generate per type. index: Name of the index column. nlp: Whether to include disease names. Returns: Pandas DataFrame with the specified number of values. """ letters = np.fromiter(iter(ascii_letters), "U1") if n_values > len(letters): letters = letters[: n_values // 2] # Make sure categories are repeated df = pd.DataFrame( dict( cat=pd.Categorical(np.random.choice(letters, n_values)), cat_ordered=pd.Categorical(np.random.choice(letters, n_values), ordered=True), int64=np.random.randint(-50, 50, n_values), float64=np.random.random(n_values), uint8=np.random.randint(255, size=n_values, dtype="uint8"), ), index=index, ) if nlp: df["nlp"] = random.sample(example_diseases, k=n_values) return df obs = _generate_typed_df(M, obs_names, nlp=include_nlp) var = _generate_typed_df(N, var_names, nlp=include_nlp) obs.rename(columns=dict(cat="obs_cat"), inplace=True) var.rename(columns=dict(cat="var_cat"), inplace=True) if X_type is None: X = None else: if include_nlp: X_np_array = np.random.binomial(100, 0.005, (M, N - 1)).astype(object) X = np.append(X_np_array, list(map(lambda el: [el], random.sample(example_diseases, k=M))), axis=1) else: X_np_array = np.random.binomial(100, 0.005, (M, N)) X = X_type(X_np_array).astype(X_dtype) obsm = dict( array=np.random.random((M, 50)), sparse=sparse.random(M, 100, format="csr"), df=_generate_typed_df(M, obs_names), ) obsm = {k: v for k, v in obsm.items() if type(v) in obsm_types} varm = dict( array=np.random.random((N, 50)), sparse=sparse.random(N, 100, format="csr"), df=_generate_typed_df(N, var_names), ) varm = {k: v for k, v in varm.items() if type(v) in varm_types} layers = dict(array=np.random.random((M, N)), sparse=sparse.random(M, N, format="csr")) layers = {k: v for k, v in layers.items() if type(v) in layers_types} obsp = dict(array=np.random.random((M, M)), sparse=sparse.random(M, M, format="csr")) varp = dict(array=np.random.random((N, N)), sparse=sparse.random(N, N, format="csr")) def _generate_vstr_recarray(m, n, dtype=None): size = m * n lengths = np.random.randint(3, 5, size) letters = np.array(list(ascii_letters)) gen_word = lambda w: "".join(np.random.choice(letters, w)) arr = np.array([gen_word(length) for length in lengths]).reshape(m, n) return pd.DataFrame(arr, columns=[gen_word(5) for _ in range(n)]).to_records(index=False, column_dtypes=dtype) uns = dict( O_recarray=_generate_vstr_recarray(N, 5), nested=dict( scalar_str="str", scalar_int=42, scalar_float=3.0, nested_further=dict(array=np.arange(5)), ), ) if include_nlp: X_dtype = np.dtype(object) adata = AnnData( X=X, obs=obs, var=var, obsm=obsm, varm=varm, layers=layers, obsp=obsp, varp=varp, dtype=X_dtype, uns=uns, ) logg.info(f"Generated an AnnData object with n_obs x n_vars = `{adata.n_obs}` x `{adata.n_vars}`.") return adata
[docs]def get_obs_df( # pragma: no cover adata: AnnData, keys: Iterable[str] = (), obsm_keys: Iterable[tuple[str, int]] = (), *, layer: str = None, features: str = None, ): """Return values for observations in adata. Args: adata: AnnData object to get values from. keys: Keys from either `.var_names`, `.var[gene_symbols]`, or `.obs.columns`. obsm_keys: Tuple of `(key from obsm, column index of obsm[key])`. layer: Layer of `adata`. features: Column of `adata.var` to search for `keys` in. Returns: A dataframe with `adata.obs_names` as index, and values specified by `keys` and `obsm_keys`. Examples: >>> import ehrapy as ep >>> adata = ep.dt.mimic_2(encoded=True) >>> ages = ep.ad.get_obs_df(adata, keys = ['age']) """ return obs_df(adata=adata, keys=keys, obsm_keys=obsm_keys, layer=layer, gene_symbols=features)
[docs]def get_var_df( # pragma: no cover adata: AnnData, keys: Iterable[str] = (), varm_keys: Iterable[tuple[str, int]] = (), *, layer: str = None, ): """Return values for observations in adata. Args: adata: AnnData object to get values from. keys: Keys from either `.obs_names`, or `.var.columns`. varm_keys: Tuple of `(key from varm, column index of varm[key])`. layer: Layer of `adata`. Returns: A dataframe with `adata.var_names` as index, and values specified by `keys` and `varm_keys`. Examples: >>> import ehrapy as ep >>> adata = ep.dt.mimic_2(encoded=True) >>> four_patients = ep.ad.get_var_df(adata, keys = ['0', '1', '2', '3']) """ return var_df(adata=adata, keys=keys, varm_keys=varm_keys, layer=layer)
[docs]def get_rank_features_df( adata: AnnData, group: str | Iterable[str], *, key: str = "rank_features_groups", pval_cutoff: float | None = None, log2fc_min: float | None = None, log2fc_max: float | None = None, features: str | None = None, ): """:func:`ehrapy.tl.rank_features_groups` results in the form of a :class:`~pandas.DataFrame`. Args: adata: AnnData object to get values from. group: Which group (as in :func:`ehrapy.tl.rank_genes_groups`'s `groupby` argument) to return results from. Can be a list. All groups are returned if groups is `None`. key: Key differential groups were stored under. pval_cutoff: Return only adjusted p-values below the cutoff. log2fc_min: Minimum logfc to return. log2fc_max: Maximum logfc to return. features: Column name in `.var` DataFrame that stores gene symbols. Specifying this will add that column to the returned DataFrame. Returns: A Pandas DataFrame of all rank genes groups results. Examples: >>> import ehrapy as ep >>> adata = ep.dt.mimic_2(encoded=True) >>> ep.tl.rank_features_groups(adata, "service_unit") >>> df = ep.ad.get_rank_features_df(adata, group="FICU") """ return rank_genes_groups_df( adata=adata, group=group, key=key, pval_cutoff=pval_cutoff, log2fc_min=log2fc_min, log2fc_max=log2fc_max, gene_symbols=features, )
class NotEncodedError(AssertionError): pass