Source code for pmaf.biome.essentials._samplemeta
import warnings
warnings.simplefilter("ignore", category=FutureWarning)
from pmaf.biome.essentials._metakit import EssentialSampleMetabase
from pmaf.biome.essentials._base import EssentialBackboneBase
from os import path
import pandas as pd
import numpy as np
from collections import defaultdict
import biom
from typing import Union, Optional, Tuple, Callable, Any
from pmaf.internal._typing import AnyGenericIdentifier, Mapper
[docs]class SampleMetadata(EssentialBackboneBase, EssentialSampleMetabase):
"""An `essential` class for handling sample metadata."""
def __init__(
self,
samples: Union[pd.DataFrame, str],
axis: Union[int, str] = 1,
index_col: Union[str, int] = 0,
**kwargs: Any
) -> None:
"""Constructor for :class:`.SampleMetadata`
Parameters
----------
samples
Data containing sample metadata
axis
Sample index axis. Using 0/`index` sets rows as sample indices while 1/`columns` sets columns as indices.
index_col
Which row/column to use as index.
kwargs
Passed to :func:`~pandas.read_csv` or :mod:`biome` loader.
"""
tmp_sample = None
tmp_metadata = kwargs.pop("metadata", {})
if axis in [0, 1, "index", "columns"]:
tmp_axis = 0 if axis in [0, "index"] else 1
else:
raise ValueError("`axis` is invalid.")
if isinstance(samples, pd.DataFrame):
if samples.shape[1] > 0:
tmp_sample = samples
else:
raise ValueError("Provided `samples` Datafame is invalid.")
elif isinstance(samples, str):
if path.isfile(samples):
file_extension = path.splitext(samples)[-1].lower()
if file_extension in [".csv", ".tsv"]:
tmp_sample = pd.read_csv(samples, index_col=index_col, **kwargs)
elif file_extension in [".biom", ".biome"]:
tmp_frequency, new_metadata = self.__load_biom(samples, **kwargs)
tmp_metadata.update({"biom": new_metadata})
else:
raise NotImplementedError("File type is not supported.")
else:
raise FileNotFoundError("Provided `samples` file path is invalid.")
else:
raise TypeError("Provided `samples` has invalid type.")
tmp_sample = tmp_sample.T if tmp_axis == 1 else tmp_sample
self.__internal_samples = pd.concat(
[
tmp_sample.select_dtypes([], ["object"]),
tmp_sample.select_dtypes(["object"]).apply(
pd.Series.astype, dtype="category"
),
],
axis=1,
).reindex(tmp_sample.columns, axis=1)
super().__init__(metadata=tmp_metadata, **kwargs)
[docs] @classmethod
def from_csv(cls, filepath: str, **kwargs: Any) -> "SampleMetadata":
"""Factory method to construct a :class:`.SampleMetadata` from CSV
file.
Parameters
----------
filepath
Path to .csv file.
kwargs
Passed to the constructor.
Returns
-------
Instance of class:`.SampleMetadata`
"""
tmp_sample = pd.read_csv(filepath, **kwargs)
tmp_metadata = kwargs.pop("metadata", {})
tmp_metadata.update({"filepath": path.abspath(filepath)})
return cls(samples=tmp_sample, metadata=tmp_metadata, **kwargs)
[docs] @classmethod
def from_biom(cls, filepath: str, **kwargs) -> "SampleMetadata":
"""Factory method to construct a :class:`.SampleMetadata` from
:mod:`biom` file.
Parameters
----------
filepath
Path to :mod:`biom` file
kwargs
Passed to the constructor
Returns
-------
Instance of class:`.SampleMetadata`
"""
samples_frame, new_metadata = cls.__load_biom(filepath, **kwargs)
tmp_metadata = kwargs.pop("metadata", {})
tmp_metadata.update({"biom": new_metadata})
return cls(frequency=samples_frame, metadata=tmp_metadata, **kwargs)
@classmethod
def __load_biom(cls, filepath: str, **kwargs: Any) -> Tuple[pd.DataFrame, dict]:
"""Actual private method to process :mod:`biom` file.
Parameters
----------
filepath
:mod:`biom` file path
kwargs
Compatibility
"""
biom_file = biom.load_table(filepath)
if biom_file.metadata(axis="sample") is not None:
sample_data = biom_file.metadata_to_dataframe("sample")
else:
raise ValueError("Biom file does not contain sample metadata.")
return sample_data, {}
def _rename_samples_by_map(self, map_like: Mapper, **kwargs) -> Optional[Mapper]:
"""Rename sample names by map and ratify action.
Parameters
----------
map_like
Mapper to use for renaming.
kwargs
Compatibility
"""
self.__internal_samples.rename(mapper=map_like, axis=0, inplace=True)
return self._ratify_action("_rename_samples_by_map", map_like, **kwargs)
def _remove_samples_by_id(
self, ids: AnyGenericIdentifier, **kwargs
) -> Optional[AnyGenericIdentifier]:
"""Remove samples by sample ids and ratify action.
Parameters
----------
ids
Sample identifiers
kwargs
Compatibility
"""
tmp_ids = np.asarray(ids, dtype=self.__internal_samples.index.dtype)
if len(tmp_ids) > 0:
self.__internal_samples.drop(tmp_ids, inplace=True)
return self._ratify_action("_remove_samples_by_id", ids, **kwargs)
def _merge_samples_by_map(
self,
map_dict: Mapper,
aggfunc: Union[str, Callable] = "mean",
variable: Union[str, int, None] = None,
**kwargs
) -> Optional[Mapper]:
"""Merge samples and ratify action.
Parameters
----------
map_dict
Map to use for merging
aggfunc
Aggregation function
variable
Compatibility
kwargs
Compatibility
"""
tmp_agg_dict = defaultdict(list)
for new_id, group in map_dict.items():
tmp_agg_dict[new_id] = (
self.__internal_samples.loc[group, :]
.agg(func=aggfunc, axis=0)
.to_dict()
)
tmp_samples = pd.DataFrame.from_dict(tmp_agg_dict, orient="index")
tmp_samples.index.name = self.__internal_samples.index.name
self.__internal_samples = tmp_samples
return self._ratify_action(
"_merge_samples_by_map", map_dict, aggfunc=aggfunc, **kwargs
)
[docs] def rename_samples(self, mapper: Mapper) -> None:
"""Rename sample names by `mapper`
Parameters
----------
mapper
Dict-like mapper use for renaming.
"""
if isinstance(mapper, dict) or callable(mapper):
if isinstance(mapper, dict):
if self.__internal_samples.index.isin(list(mapper.keys())).sum() == len(
mapper
):
self._rename_samples_by_map(mapper)
else:
raise ValueError("Invalid sample ids are provided.")
else:
self._rename_samples_by_map(mapper)
else:
raise TypeError("Invalid `mapper` type.")
[docs] def drop_sample_by_id(
self, ids: AnyGenericIdentifier, **kwargs
) -> Optional[AnyGenericIdentifier]:
"""Drop samples by sample identifiers.
Parameters
----------
ids
Identifiers to remove
kwargs
Compatibility
"""
target_ids = np.asarray(ids)
if self.xsid.isin(target_ids).sum() == len(target_ids):
return self._remove_samples_by_id(target_ids, **kwargs)
else:
raise ValueError("Invalid sample ids are provided.")
[docs] def get_variables_by_id(
self,
ids: Optional[AnyGenericIdentifier] = None,
variables: Optional[AnyGenericIdentifier] = None,
) -> Union[pd.Series, pd.DataFrame, str, int]:
"""Get sample metadata by sample identifiers and variables.
Parameters
----------
ids
Sample identifiers
variables
Metadata variables
Returns
-------
class:`~pandas.DataFrame`
"""
if ids is None:
target_ids = self.xsid
else:
target_ids = np.asarray(ids)
if variables is None:
target_vars = self.variables
else:
target_vars = np.asarray(variables)
if (
self.__internal_samples.index.isin(target_ids).sum() <= len(target_ids)
) and (
self.__internal_samples.columns.isin(target_vars).sum() <= len(target_vars)
):
return self.__internal_samples.loc[target_ids, target_vars]
else:
raise ValueError("Invalid sample ids or variables are provided.")
[docs] def merge_samples_by_variable(
self,
variable: Union[str, int],
aggfunc: Union[str, Callable] = "mean",
**kwargs
) -> Optional[Mapper]:
"""Merge samples by `variable`.
Parameters
----------
variable
Sample metadata variable.
aggfunc
Aggregation function that will be applied to both :class:`.SampleMetadata`
instance and ratified to other `essentials`
if contained in :class:`~pmaf.biome.assembly.BiomeAssembly` instance.
kwargs
Compatibility
"""
ret = {}
if variable not in self.__internal_samples.columns:
raise TypeError("`variable` is invalid.")
groups = self.__internal_samples.groupby(variable)
if len(groups.groups) > 1:
tmp_variable = []
tmp_groups = []
group_indices = []
for var, sample_ids in groups.groups.items():
tmp_variable.append(var)
tmp_groups.append(list(sample_ids))
group_indices.append(var)
ret = dict(zip(group_indices, tmp_groups))
return self._merge_samples_by_map(
ret, aggfunc=aggfunc, variable=variable, **kwargs
)
[docs] def copy(self) -> "SampleMetadata":
"""Copy of the instance."""
return type(self)(
samples=self.__internal_samples.copy(),
axis=0,
metadata=self.metadata,
name=self.name,
)
[docs] def get_subset(
self, sids: AnyGenericIdentifier = None, *args, **kwargs
) -> "SampleMetadata":
"""Get subset of the :class:`.SampleMetadata`.
Parameters
----------
sids
Sample Identifiers
args
Compatibility
Returns
-------
Instance of class:`.SampleMetadata`
"""
if sids is None:
target_sids = self.xsid
else:
target_sids = np.asarray(sids).astype(self.__internal_samples.index.dtype)
if not self.xsid.isin(target_sids).sum() == len(target_sids):
raise ValueError("Invalid sample ids are provided.")
return type(self)(
samples=self.__internal_samples.loc[target_sids, :],
axis=0,
metadata=self.metadata,
name=self.name,
)
def _export(self, *args, **kwargs) -> Tuple[pd.DataFrame, dict]:
"""Present only for backward compatibility with other `essentials`."""
return self.data, kwargs
[docs] def export(
self, output_fp: str, *args, _add_ext: bool = False, sep: str = ",", **kwargs
) -> None:
"""Exports the sample metadata content into the specified file.
Parameters
----------
output_fp
Export filepath
args
Compatibility
_add_ext
Add file extension or not.
sep
Delimiter
kwargs
Compatibility
"""
tmp_export, rkwarg = self._export(*args, **kwargs)
if _add_ext:
tmp_export.to_csv("{}.csv".format(output_fp), sep=sep)
else:
tmp_export.to_csv(output_fp, sep=sep)
@property
def variables(self) -> np.ndarray:
"""Sample metadata variables."""
return self.__internal_samples.columns.values
@property
def data(self) -> pd.DataFrame:
"""Sample metadata."""
return self.__internal_samples
@property
def xsid(self) -> pd.Index:
"""Sample identifiers."""
return self.__internal_samples.index