Source code for ehrapy.io._read

from __future__ import annotations

from pathlib import Path
from typing import Iterator, Literal

import fhiry.parallel as fp
import numpy as np
import pandas as pd
from _collections import OrderedDict
from anndata import AnnData
from anndata import read as read_h5
from rich import print

from ehrapy import ehrapy_settings, settings
from ehrapy.anndata.anndata_ext import df_to_anndata
from ehrapy.data._dataloader import download, remove_archive_extension
from ehrapy.preprocessing._encode import encode


[docs]def read_csv( dataset_path: Path | str, sep: str = ",", index_column: dict[str, str | int] | str | int | None = None, columns_obs_only: dict[str, list[str]] | list[str] | None = None, columns_x_only: dict[str, list[str]] | list[str] | None = None, return_dfs: bool = False, cache: bool = False, backup_url: str | None = None, download_dataset_name: str | None = None, archive_format: Literal["zip", "tar", "tar.gz", "tgz"] = None, **kwargs, ) -> AnnData | dict[str, AnnData]: """Reads or downloads a desired directory of csv/tsv files or a single csv/tsv file. Args: dataset_path: Path to the file or directory to read. sep: Separator in the file. One of either , (comma) or \t (tab). Defaults to , (comma) index_column: The index column of obs. Usually the patient visit ID or the patient ID. columns_obs_only: These columns will be added to obs only and not X. columns_x_only: These columns will be added to X only and all remaining columns to obs. Note that datetime columns will always be added to .obs though. return_dfs: Whether to return one or several Pandas DataFrames. cache: Whether to write to cache when reading or not. Defaults to False . download_dataset_name: Name of the file or directory after download. backup_url: URL to download the data file(s) from, if the dataset is not yet on disk. is_archive: Whether the downloaded file is an archive. Returns: An :class:`~anndata.AnnData` object or a dict with an identifier (the filename, without extension) for each :class:`~anndata.AnnData` object in the dict Examples: >>> import ehrapy as ep >>> adata = ep.io.read_csv("myfile.csv") """ _check_columns_only_params(columns_obs_only, columns_x_only) dataset_path = Path(dataset_path) if not dataset_path.exists(): dataset_path = _get_non_existing_files(dataset_path, download_dataset_name, backup_url, archive_format) adata = _read_csv( file_path=dataset_path, sep=sep, index_column=index_column, columns_obs_only=columns_obs_only, columns_x_only=columns_x_only, return_dfs=return_dfs, cache=cache, **kwargs, ) return adata
def _read_csv( file_path: Path, sep: str, index_column: dict[str, str | int] | str | int | None, columns_obs_only: dict[str, list[str]] | list[str] | None, columns_x_only: dict[str, list[str]] | list[str] | None, return_dfs: bool = False, cache: bool = False, **kwargs, ) -> AnnData | dict[str, AnnData]: """Internal interface of the read_csv method.""" if cache and return_dfs: raise CachingNotSupported("Caching is currently not supported for Pandas DataFrame objects.") if return_dfs and (columns_x_only or columns_obs_only): raise Warning( "Parameters columns_x_only and columns_obs_only are not supported when returning Pandas DataFrames." ) path_cache = settings.cachedir / file_path # reading from (cache) file is separated in the read_h5ad function if cache and (path_cache.is_dir() or path_cache.is_file()): raise CacheExistsException( f"{path_cache} already exists. Use the read_h5ad function instead to read from cache!" ) # If the the file path is a directory, assume it is a dataset with multiple files elif file_path.is_dir(): return _read_from_directory( file_path, cache, path_cache, extension=sep, index_column=index_column, columns_obs_only=columns_obs_only, columns_x_only=columns_x_only, return_dfs=return_dfs, ) # input is a single file else: if sep not in {",", "\t"}: raise ValueError("Please provide one of the available separators , or tab") adata, columns_obs_only = _do_read_csv( file_path, sep, index_column, columns_obs_only, columns_x_only, cache, **kwargs # type: ignore ) # cache results if desired if cache: if not path_cache.parent.is_dir(): path_cache.parent.mkdir(parents=True) return _write_cache(adata, path_cache, columns_obs_only) # type: ignore return adata
[docs]def read_h5ad( dataset_path: Path | str, backup_url: str | None = None, download_dataset_name: str | None = None, archive_format: Literal["zip", "tar", "tar.gz", "tgz"] = None, ) -> AnnData | dict[str, AnnData]: """Reads or downloads a desired directory of h5ad files or a single h5ad file. Args: dataset_path: Path to the file or directory to read. download_dataset_name: Name of the file or directory in case the dataset is downloaded backup_url: URL to download the data file(s) from if not yet existing. Returns: An :class:`~anndata.AnnData` object or a dict with an identifier (the filename, without extension) for each :class:`~anndata.AnnData` object in the dict Examples: >>> import ehrapy as ep >>> adata = ep.dt.mimic_2(encoded=True) >>> ep.io.write("mimic_2.h5ad", adata) >>> adata_2 = ep.io.read_h5ad("mimic_2.h5ad") """ file_path: Path = Path(dataset_path) if not file_path.exists(): file_path = _get_non_existing_files(file_path, download_dataset_name, backup_url, archive_format=archive_format) if file_path.is_dir(): adata = _read_from_directory(file_path, False, None, "h5ad") else: adata = _do_read_h5ad(file_path) return adata
def _read_from_directory( file_path: Path, cache: bool, path_cache_dir: Path | None, extension: str, index_column: dict[str, str | int] | str | int | None = None, columns_obs_only: dict[str, list[str]] | list[str] | None = None, columns_x_only: dict[str, list[str]] | list[str] | None = None, return_dfs: bool = False, ) -> dict[str, AnnData] | dict[str, pd.DataFrame]: """Parse AnnData objects or Pandas DataFrames from a directory containing the data files""" if return_dfs: dfs = _read_multiple_csv(file_path, sep=extension, return_dfs=True) return dfs # type: ignore if extension in {",", "\t"}: adata_objects, columns_obs_only = _read_multiple_csv( # type: ignore file_path, sep=extension, index_column=index_column, columns_obs_only=columns_obs_only, columns_x_only=columns_x_only, return_dfs=False, ) # cache results if cache: if not path_cache_dir.parent.is_dir(): path_cache_dir.parent.mkdir(parents=True) path_cache_dir.mkdir() return _write_cache_dir(adata_objects, path_cache_dir, columns_obs_only, index_column) # type: ignore return adata_objects # type: ignore elif extension == "h5ad": return _read_multiple_h5ad(file_path) else: raise NotImplementedError(f"Reading from directory with .{extension} files is not implemented yet!") def _read_multiple_csv( # noqa: N802 file_path: Path, sep: str, index_column: dict[str, str | int] | str | int | None = None, columns_obs_only: dict[str, list[str]] | list[str] | None = None, columns_x_only: dict[str, list[str]] | list[str] | None = None, return_dfs: bool = False, cache: bool = False, **kwargs, ) -> tuple[dict[str, AnnData], dict[str, list[str] | None]] | dict[str, pd.DataFrame]: """Read a dataset containing multiple .csv/.tsv files. Args: file_path: File path to the directory containing multiple .csv/.tsv files. sep: Either , or \t to determine which files to read. index_column: Column names of the index columns for obs columns_obs_only: List of columns per file (AnnData object) which should only be stored in .obs, but not in X. Useful for free text annotations. columns_x_only: List of columns per file (AnnData object) which should only be stored in .X, but not in obs. Datetime columns will be added to .obs regardless. return_dfs: When set to True, return a dictionary of Pandas DataFrames. cache: Whether to cache results or not kwargs: Keyword arguments for Pandas read_csv Returns: A Dict mapping the filename (object name) to the corresponding :class:`~anndata.AnnData` object and the columns that are obs only for each object """ obs_only_all = {} if return_dfs: df_dict: dict[str, pd.DataFrame] = {} else: anndata_dict = {} for file in file_path.iterdir(): if file.is_file() and file.suffix in {".csv", ".tsv"}: # slice off the file suffix .csv or .tsv for a clean file name file_identifier = file.name[:-4] if return_dfs: df = pd.read_csv(file, sep=sep, **kwargs) df_dict[file_identifier] = df continue index_col, col_obs_only, col_x_only = _extract_index_and_columns_obs_only( file_identifier, index_column, columns_obs_only, columns_x_only ) adata, single_adata_obs_only = _do_read_csv(file, sep, index_col, col_obs_only, col_x_only, cache=cache) obs_only_all[file_identifier] = single_adata_obs_only # obs indices have to be unique otherwise updating and working with the object will fail if index_col: adata.obs_names_make_unique() anndata_dict[file_identifier] = adata if return_dfs: return df_dict else: return anndata_dict, obs_only_all def _do_read_csv( file_path: Path | Iterator[str], delimiter: str | None = ",", index_column: str | int | None = None, columns_obs_only: list[str] | None = None, columns_x_only: list[str] | None = None, cache: bool = False, **kwargs, ) -> tuple[AnnData, list[str] | None]: """Read `.csv` and `.tsv` file. Args: file_path: File path to the csv file. delimiter: Delimiter separating the csv data within the file. index_column: Index or column name of the index column (obs) columns_obs_only: List of columns which only be stored in .obs, but not in X. Useful for free text annotations. columns_x_only: List of columns which only be stored in X, but not in .obs. cache: Whether the data should be written to cache or not Returns: An :class:`~anndata.AnnData` object and the column obs only for the object """ try: if index_column and columns_obs_only and index_column in columns_obs_only: print( f"[bold yellow]Index column [blue]{index_column} [yellow]is also used as a column " f"for obs only. Using default indices instead and moving [blue]{index_column} [yellow]to column_obs_only." ) index_column = None initial_df = pd.read_csv(file_path, delimiter=delimiter, index_col=index_column, **kwargs) # in case the index column is misspelled or does not exist except ValueError: raise IndexNotFoundError( f"Could not create AnnData object while reading file {file_path} . Does index_column named {index_column} " f"exist in {file_path}?" ) from None initial_df, columns_obs_only = _prepare_dataframe(initial_df, columns_obs_only, columns_x_only, cache) return df_to_anndata(initial_df, columns_obs_only), columns_obs_only def _read_multiple_h5ad( # noqa: N802 file_path: Path, ) -> dict[str, AnnData]: """Read a dataset containing multiple .h5ad files. Args: file_path: File path to the directory containing multiple .csv/.tsv files. Returns: A dict mapping the filename (object name) to the corresponding :class:`~anndata.AnnData` object """ anndata_dict = {} for file in file_path.iterdir(): if file.is_file() and file.suffix == ".h5ad": # slice off the file suffix .h5ad adata_identifier = file.name[:-5] adata = _do_read_h5ad(file) anndata_dict[adata_identifier] = adata return anndata_dict def _do_read_h5ad(file_path: Path | Iterator[str]) -> AnnData: """Read from a h5ad file. Args: file_path: Path to the h5ad file Returns: An AnnData object. """ adata = read_h5(file_path) if "ehrapy_dummy_encoding" in adata.uns.keys(): # if dummy encoding was needed, the original dtype of X could not be numerical, so cast it to object adata.X = adata.X.astype("object") decoded_adata = _decode_cached_adata(adata, list(adata.uns["columns_obs_only"])) return decoded_adata return adata
[docs]def read_fhir( dataset_path: str, format: Literal["json", "ndjson"] = "json", columns_obs_only: list[str] | None = None, columns_x_only: list[str] | None = None, return_df: bool = False, cache: bool = False, backup_url: str | None = None, index_column: str | int | None = None, download_dataset_name: str | None = None, archive_format: Literal["zip", "tar", "tar.gz", "tgz"] = None, ) -> pd.DataFrame | AnnData: """Reads one or multiple FHIR files using fhiry. Uses https://github.com/dermatologist/fhiry to read the FHIR file into a Pandas DataFrame which is subsequently transformed into an AnnData object. Args: dataset_path: Path to one or multiple FHIR files. format: The file format of the FHIR data. One of 'json' or 'ndjson'. Defaults to 'json'. columns_obs_only: These columns will be added to obs only and not X. columns_x_only: These columns will be added to X only and all remaining columns to obs. Note that datetime columns will always be added to .obs though. return_df: Whether to return one or several Pandas DataFrames. cache: Whether to write to cache when reading or not. Defaults to False. download_dataset_name: Name of the file or directory in case the dataset is downloaded index_column: The index column for the generated object. Usually the patient or visit ID. backup_url: URL to download the data file(s) from if not yet existing. Returns: A Pandas DataFrame or AnnData object of the read in FHIR file(s). Examples: >>> import ehrapy as ep >>> adata = ep.io.read_fhir("/path/to/fhir/resources") """ _check_columns_only_params(columns_obs_only, columns_x_only) file_path: Path = Path(dataset_path) if not file_path.exists(): file_path = _get_non_existing_files(file_path, download_dataset_name, backup_url, archive_format) adata = _read_fhir( file_path=str(file_path.resolve()), format=format, index_column=index_column, columns_obs_only=columns_obs_only, columns_x_only=columns_x_only, return_df=return_df, cache=cache, ) return adata
def _read_fhir( file_path: str, format: Literal["json", "ndjson"], index_column: dict[str, str | int] | str | int | None, columns_obs_only: list[str] | None, columns_x_only: list[str] | None, return_df: bool = False, cache: bool = False, ) -> AnnData | dict[str, AnnData]: """Internal interface of the read_fhir method.""" if cache and return_df: raise CachingNotSupported("Caching is currently not supported for or Pandas DataFrame objects.") if return_df and (columns_x_only or columns_obs_only): raise Warning( "Parameters columns_x_only and columns_obs_only are not supported when returning Pandas DataFrames." ) path_cache = settings.cachedir / file_path if cache and (path_cache.is_dir() or path_cache.is_file()): raise CacheExistsException( f"{path_cache} already exists. Use the read_h5ad function instead to read from cache!" ) if format == "json": df = fp.process(file_path) elif format == "ndjson": df = fp.ndjson(file_path) else: raise ValueError("Only folders containing json and ndjson in FHIR format are supported.") df, columns_obs_only = _prepare_dataframe(df, columns_obs_only, columns_x_only, cache) if index_column: df.set_index(index_column) if return_df: return df else: adata = df_to_anndata(df, columns_obs_only) if cache: if not path_cache.parent.is_dir(): path_cache.parent.mkdir(parents=True) return _write_cache(adata, path_cache, columns_obs_only) # type: ignore return adata def _get_non_existing_files( dataset_path: Path, download_dataset_name: str, backup_url: str, archive_format: Literal["zip", "tar", "tar.gz", "tgz"] = None, ) -> Path: """Handle non existing files or directories by trying to download from a backup_url and moving them in the correct directory. Returns: The file or directory path of the downloaded content. """ if backup_url is None and not dataset_path.exists(): raise ValueError( f"File or directory {dataset_path} does not exist and no backup_url was provided.\n" f"Please provide a backup_url or check whether path is spelled correctly." ) print("[bold yellow]Path or dataset does not yet exist. Attempting to download...") download( backup_url, output_file_name=download_dataset_name, output_path=ehrapy_settings.datasetdir, archive_format=archive_format, ) if archive_format: dataset_path = remove_archive_extension(dataset_path) return dataset_path def _read_from_cache_dir(cache_dir: Path) -> dict[str, AnnData]: """Read AnnData objects from the cache directory.""" adata_objects = {} # read each cache file in the cache directory and store it into a dict for cache_file in cache_dir.iterdir(): if cache_file.name.endswith(".h5ad"): adata_objects[cache_file.stem] = _read_from_cache(cache_file) return adata_objects def _read_from_cache(path_cache: Path) -> AnnData: """Read AnnData object from cached file.""" cached_adata = read_h5(path_cache) # type cast required when dealing with non numerical data; otherwise all values in X would be treated as strings if not np.issubdtype(cached_adata.X.dtype, np.number): cached_adata.X = cached_adata.X.astype("object") try: columns_obs_only = list(cached_adata.uns["columns_obs_only"]) del cached_adata.uns["columns_obs_only"] # in case columns_obs_only has not been passed except KeyError: columns_obs_only = [] # required since reading from cache returns a numpy array instead of a list here cached_adata.uns["numerical_columns"] = list(cached_adata.uns["numerical_columns"]) # recreate the original AnnData object with the index column for obs and obs only columns cached_adata = _decode_cached_adata(cached_adata, columns_obs_only) return cached_adata def _write_cache_dir( adata_objects: dict[str, AnnData], path_cache: Path, columns_obs_only, index_column: dict[str, str | int] | None, # type ignore ) -> dict[str, AnnData]: """Write multiple AnnData objects into a common cache directory keeping index column and columns_obs_only. Args: adata_objects: A dictionary with an identifier as key for each of the AnnData objects path_cache: Path to the cache directory columns_obs_only: Columns for obs only index_column: The index columns for each object (if any) Returns: A dict containing an unique identifier and an :class:`~anndata.AnnData` object for each file read """ for identifier in adata_objects: # for each identifier (for the AnnData object), we need the index column and obs_only cols (if any) for reuse when reading cache index_col, cols_obs_only, _ = _extract_index_and_columns_obs_only(identifier, index_column, columns_obs_only) adata_objects[identifier] = _write_cache( adata_objects[identifier], path_cache / (identifier + ".h5ad"), cols_obs_only ) return adata_objects def _write_cache( raw_anndata: AnnData, path_cache: Path, columns_obs_only: list[str] | None, ) -> AnnData: """Write AnnData object to cache""" original_x_dtype = raw_anndata.X.dtype if not np.issubdtype(original_x_dtype, np.number): cached_adata = encode(data=raw_anndata, autodetect=True) else: cached_adata = raw_anndata # temporary key that stores all column names that are obs only for this AnnData object cached_adata.uns["columns_obs_only"] = columns_obs_only cached_adata.uns["ehrapy_dummy_encoding"] = True # append correct file suffix if not path_cache.suffix == ".h5ad": if path_cache.suffix in {".tsv", ".csv"}: path_cache = Path(str(path_cache)[:-4] + ".h5ad") else: path_cache = Path(str(path_cache) + ".h5ad") cached_adata.write(path_cache) # preserve original dtype of X (either numerical or object) cached_adata.X = cached_adata.X.astype(original_x_dtype) cached_adata = _decode_cached_adata(cached_adata, columns_obs_only) return cached_adata def _prepare_dataframe(initial_df: pd.DataFrame, columns_obs_only, columns_x_only=None, cache=False): """Prepares the dataframe to be casted into an AnnData object. Datetime columns will be detected and added to columns_obs_only. Returns: The initially parsed dataframe and an updated list of columns_obs_only. """ # when passing columns x only, simply handle the (asymmetric) difference to be obs only and everything else is kept in X if columns_x_only: columns_obs_only = list(set(initial_df.columns) - set(columns_x_only)) # get all object dtype columns object_type_columns = [col_name for col_name in initial_df.columns if initial_df[col_name].dtype == "object"] # if columns_obs_only is None, initialize it as datetime columns need to be included here if not columns_obs_only: columns_obs_only = [] no_datetime_object_col = [] for col in object_type_columns: try: pd.to_datetime(initial_df[col], format="mixed") # only add to column_obs_only if not present already to avoid duplicates if col not in columns_obs_only: columns_obs_only.append(col) except (ValueError, TypeError): # we only need to replace NANs on non datetime, non numerical columns since datetime are obs only by default no_datetime_object_col.append(col) # writing to hd5a files requires non string to be empty in non numerical columns if cache: # TODO remove this when anndata 0.8.0 is released initial_df[no_datetime_object_col] = initial_df[no_datetime_object_col].fillna("") # temporary workaround needed; see https://github.com/theislab/anndata/issues/504 and https://github.com/theislab/anndata/issues/662 # converting booleans to strings is needed for caching as writing to .h5ad files currently does not support writing boolean values bool_columns = { column_name: "str" for column_name in initial_df.columns if initial_df.dtypes[column_name] == "bool" } initial_df = initial_df.astype(bool_columns) return initial_df, columns_obs_only def _decode_cached_adata(adata: AnnData, column_obs_only: list[str]) -> AnnData: """Decode the label encoding of initial AnnData object Args: adata: The label encoded AnnData object column_obs_only: The columns, that should be kept in obs Returns: The decoded, initial AnnData object """ var_names = list(adata.var_names) # for each encoded categorical, replace its encoded values with its original values in X for idx, var_name in enumerate(var_names): if not var_name.startswith("ehrapycat_"): break value_name = var_name[10:] original_values = adata.uns["original_values_categoricals"][value_name] adata.X[:, idx : idx + 1] = original_values # update var name per categorical var_names[idx] = value_name # drop all columns, that are not obs only in obs if column_obs_only: adata.obs = adata.obs[column_obs_only] else: adata.obs = pd.DataFrame(index=adata.obs.index) # set the new var names (unencoded ones) adata.var.index = var_names adata.layers["original"] = adata.X.copy() # reset uns but keep numerical columns numerical_columns = adata.uns["numerical_columns"] adata.uns = OrderedDict() adata.uns["numerical_columns"] = numerical_columns adata.uns["non_numerical_columns"] = list(set(adata.var_names) ^ set(numerical_columns)) return adata def _extract_index_and_columns_obs_only(identifier: str, index_columns, columns_obs_only, columns_x_only=None): """Extract the index column (if any) and the columns, for obs only (if any) from the given user input. For each file, `index_columns` and `columns_obs_only` can provide three cases: 1.) The filename (thus the identifier) is not present as a key and no default key is provided or one or both dicts are empty: --> No index column will be set and/or no columns are obs only (based on user input) .. code-block:: python # some setup code here ... # filename identifier1 = "MyFile" identifier2 = "MyOtherFile" # no default key and identifier1 is not in the index or columns_obs_only keys # -> no index column will be set and no columns will be obs only (except datetime, if any) index_columns = {"MyOtherFile":"MyOtherColumn1"} columns_obs_only = {"MyOtherFile":["MyOtherColumn2"]} 2.) The filename (thus the identifier) is not present as a key, but default key is provided --> The index column will be set and/or columns will be obs only according to the default key .. code-block:: python # some setup code here ... # filename identifier1 = "MyFile" identifier2 = "MyOtherFile" # identifier1 is not in the index or columns_obs_only keys, but default key is set for both # -> index column will be set using MyColumn1 and column obs only will include MyColumn2 index_columns = {"MyOtherFile":"MyOtherColumn1", "default": "MyColumn1"} columns_obs_only = {"MyOtherFile":["MyOtherColumn2"], "default": ["MyColumn2"]} 3.) The filename is present as a key --> The index column will be set and/or columns are obs only according to its value .. code-block:: python # some setup code here ... # filename identifier1 = "MyFile" identifier2 = "MyOtherFile" # identifier1 is in the index and columns_obs_only keys # -> index column will be MyColumn1 and columns_obs_only will include MyColumn2 and MyColumn3 index_columns = {"MyFile":"MyColumn1"} columns_obs_only = {"MyFile":["MyColumn2", "MyColumn3"]} Args: identifier: The name of the index_columns: Index columns columns_obs_only: Columns for obs only Returns: Index column (if any) and columns obs only (if any) for this specific AnnData object """ _index_column = None _columns_obs_only = None _columns_x_only = None # get index column (if any) if index_columns and identifier in index_columns.keys(): _index_column = index_columns[identifier] elif index_columns and "default" in index_columns.keys(): _index_column = index_columns["default"] # get columns obs only (if any) if columns_obs_only and identifier in columns_obs_only.keys(): _columns_obs_only = columns_obs_only[identifier] elif columns_obs_only and "default" in columns_obs_only.keys(): _columns_obs_only = columns_obs_only["default"] # get columns x only (if any) if columns_x_only and identifier in columns_x_only.keys(): _columns_x_only = columns_x_only[identifier] elif columns_x_only and "default" in columns_x_only.keys(): _columns_x_only = columns_x_only["default"] # if index column is also found in column_obs_only or x_only, use default indices instead and only move it to obs/X, but warn the user if (_index_column and _columns_obs_only or _index_column and _columns_x_only) and ( _index_column in _columns_obs_only or _index_column in _columns_x_only ): print( f"[bold yellow]Index column [blue]{_index_column} [yellow]for file [blue]{identifier} [yellow]is also used as a column " f"for obs or X only. Using default indices instead and moving [blue]{_index_column} [yellow]to obs/X!." ) _index_column = None return _index_column, _columns_obs_only, _columns_x_only def _check_columns_only_params( obs_only: dict[str, list[str]] | list[str] | None, x_only: dict[str, list[str]] | list[str] | None ) -> None: """Check whether columns_obs_only and columns_x_only are passed exclusively. For a single AnnData object (thus parameters being a list of strings) it's not desirable to pass both, obs_only and x_only. For multiple AnnData objects (thus the parameters being dicts of string keys with a list value), it is possible to pass both. But the keys (unique identifiers of the AnData objects, basically its names) should share no common identifier, thus a single AnnData object is either in x_only OR obs_only, but not in both. """ if not obs_only or not x_only: return if obs_only and x_only and isinstance(obs_only, list): raise ValueError( "Can not use columns_obs_only together with columns_x_only with a single AnnData object. " "At least one has to be None!" ) else: common_keys = obs_only.keys() & x_only.keys() # type: ignore if common_keys: raise ValueError( "Can not use columns_obs_only together with columns_x_only for a single AnnData object. " "The following anndata identifiers where found" f"in both: {','.join(key for key in common_keys)}!" ) class IndexNotFoundError(Exception): pass class CachingNotSupported(Exception): pass class ExtensionMissingError(Exception): pass class CacheExistsException(Exception): pass