from __future__ import annotations
from typing import TYPE_CHECKING
import numpy as np
from ehrapy._compat import use_ehrdata
if TYPE_CHECKING:
from anndata import AnnData
from ehrdata import EHRData
[docs]
@use_ehrdata(deprecated_after="1.0.0")
def ncp(
edata: EHRData | AnnData,
*,
layer: str,
rank: int = 4,
n_iter_max: int = 300,
init: str = "random",
sigmoid_transform: bool = False,
key_added: str = "ncp",
random_state: int = 0,
copy: bool = False,
) -> EHRData | AnnData | None:
r"""Non-negative CP (PARAFAC) decomposition of a 3D temporal layer.
Decomposes the stored 3D data into three factor matrices (all factors non-negative).
Uses :func:`tensorly.decomposition.non_negative_parafac`.
Args:
edata: Central data object.
layer: Key of the 3D layer to decompose (shape ``n_obs × n_vars × n_time``).
rank: Number of components (rank of the decomposition).
n_iter_max: Maximum number of ALS iterations.
init: Initialisation strategy passed to :func:`~tensorly.decomposition.non_negative_parafac` (``"random"`` or ``"svd"``).
sigmoid_transform: If ``True``, apply a sigmoid transformation to the layer
before decomposition. Useful when the layer contains raw logits.
key_added: Key prefix for storing results. Results are stored as
``edata.obsm["X_{key_added}"]`` (sample factors, shape ``n_obs × rank``),
``edata.varm["{key_added}_loadings"]`` (variable factors, shape ``n_vars × rank``),
and ``edata.uns["{key_added}"]`` (temporal factors + metadata).
random_state: Random seed for reproducibility.
copy: Whether to return a copy rather than modifying in place.
Returns:
``None`` if ``copy=False``, else a modified copy of ``edata``.
Examples:
>>> import numpy as np, pandas as pd
>>> import ehrdata as ed, ehrapy as ep
>>> np.random.seed(0)
>>> tensor = np.abs(np.random.randn(30, 8, 12)) # patients × vars × time
>>> edata = ed.EHRData(
... shape=(30, 8),
... layers={"data": tensor},
... var=pd.DataFrame(index=[f"var_{i}" for i in range(8)]),
... )
>>> ep.tl.ncp(edata, layer="data", rank=3)
>>> edata.obsm["X_ncp"].shape # (30, 3) – sample factors
>>> edata.varm["ncp_loadings"].shape # (8, 3) – variable factors
>>> edata.uns["ncp"]["temporal_factors"].shape # (12, 3) – time factors
"""
if layer not in edata.layers:
raise KeyError(f"Layer {layer!r} not found in edata.layers. Available: {list(edata.layers)}")
tensor = np.asarray(edata.layers[layer], dtype=np.float64)
if tensor.ndim != 3:
raise ValueError(f"Layer {layer!r} must be 3D (n_obs × n_vars × n_time), got shape {tensor.shape}.")
if sigmoid_transform:
from scipy.special import expit
tensor = expit(tensor)
edata = edata.copy() if copy else edata
from tensorly.decomposition import non_negative_parafac
weights, factors = non_negative_parafac(
tensor, rank=rank, init=init, n_iter_max=n_iter_max, random_state=random_state
)
A, B, C = (np.asarray(f) for f in factors)
# absorb weights into the sample factor so each component is self-contained
A = A * np.asarray(weights)[np.newaxis, :]
edata.obsm[f"X_{key_added}"] = A # (n_obs, rank)
edata.varm[f"{key_added}_loadings"] = B # (n_vars, rank)
edata.uns[key_added] = {
"params": {
"layer": layer,
"rank": rank,
"n_iter_max": n_iter_max,
"init": init,
"sigmoid_transform": sigmoid_transform,
},
"temporal_factors": C, # (n_time, rank)
}
return edata if copy else None