Source code for pmaf.biome.essentials._tree

import warnings

warnings.simplefilter("ignore", category=FutureWarning)
from pmaf.biome.essentials._metakit import EssentialFeatureMetabase
from pmaf.biome.essentials._base import EssentialBackboneBase
import pandas as pd
import numpy as np
from pmaf.phylo.tree._tree import PhyloTree
from pmaf.phylo.tree._backends import TreeEte3Base
from io import StringIO
from typing import Union, Optional, Tuple, Any
from pmaf.internal._typing import AnyGenericIdentifier, Mapper


[docs]class RepPhylogeny(EssentialBackboneBase, EssentialFeatureMetabase): """An `essential` class for handling phylogeny data.""" def __init__( self, tree: Union[PhyloTree, TreeEte3Base, StringIO, str], feature_ids: Optional[AnyGenericIdentifier] = None, prune: bool = False, annotation: Union[dict, pd.Series, None] = None, copy: bool = True, ignore_polytomy: bool = False, **kwargs: Any ) -> None: """Constructor for :class:`.RepPhylogeny` Parameters ---------- tree Phylogeny data feature_ids Target feature identifiers prune Whether to prune for `feature_ids` annotation Annotations for tips copy Whether to copy the original tree. May require long time if tree is large ignore_polytomy Whether to resolve tree polytomy or not kwargs Compatibility """ super().__init__(**kwargs) if feature_ids is None: tmp_feature_ids = np.asarray([]) else: tmp_feature_ids = np.asarray(feature_ids) if len(np.unique(tmp_feature_ids)) < len(tmp_feature_ids): raise ValueError("`feature_ids` cannot contain duplicates.") if annotation is not None: if isinstance(annotation, dict): tmp_annotation = annotation elif isinstance(annotation, pd.Series): tmp_annotation = annotation.to_dict() else: raise TypeError("`annotation` can be None or dict-like") else: tmp_annotation = {} if isinstance(tree, PhyloTree): if tree.total_nodes > 0: tmp_tree = tree.copy() if copy else tree else: raise ValueError("Provided `tree` has no nodes") else: tmp_tree = PhyloTree(tree, copy=copy) tmp_node_names = tmp_tree.node_names tmp_feature_ids_adj = [type(tmp_node_names[0])(fid) for fid in tmp_feature_ids] if len(tmp_feature_ids_adj) > 0: if all( [feature_id in tmp_node_names for feature_id in tmp_feature_ids_adj] ): tmp_tip_ids = tmp_feature_ids_adj else: raise ValueError( "Some ids in `feature_ids` are not present in `tree` nodes." ) else: tmp_tip_ids = tmp_tree.tip_names if len(tmp_tip_ids) != len(list(set(tmp_tip_ids))): raise ValueError("Tree tips must have unique names.") if prune and len(tmp_feature_ids_adj) > 0: tmp_tree.prune_by_ids(tmp_tip_ids) if len(tmp_annotation) > 0: tmp_annotation_adj = { type(tmp_tip_ids[0])(k): v for k, v in tmp_annotation.items() } if not all([tip_id in tmp_annotation_adj.keys() for tip_id in tmp_tip_ids]): raise ValueError( "Some indices in `annotation` are not present in `tree` nodes." ) else: tmp_annotation_adj = {} tmp_tree.sort_by_name() tmp_tree.clear_internal_node_names() if not ignore_polytomy: tmp_tree.resolve_polytomy() self.__feature_ids = np.asarray(tmp_tip_ids) self.__feature_ids_dtype = self.__feature_ids.dtype # PhyloTree class uses ete3 trees, which work unstable when tips are not # strings. Therefore, dtype of internal `__feature_ids` and external `xrid` # are kept different. self.__internal_tree = tmp_tree self.__annotations = tmp_annotation_adj def _remove_features_by_id( self, ids: AnyGenericIdentifier, **kwargs: Any ) -> Optional[AnyGenericIdentifier]: """Remove features by feature ids and ratify action. Parameters ---------- ids Feature identifiers kwargs Compatibility """ tmp_ids = np.asarray(ids).astype(self.__feature_ids.dtype) if len(tmp_ids) > 0: for node_name in tmp_ids: self.__internal_tree.remove_node_by_name(str(node_name)) self.__feature_ids = np.asarray( [fid for fid in self.__feature_ids if fid not in tmp_ids] ) return self._ratify_action("_remove_features_by_id", ids, **kwargs) def _merge_features_by_map( self, map_dict: Mapper, _annotations: Union[dict, pd.Series, None] = None, **kwargs: Any ) -> Optional[Mapper]: """Merge features by map.. Parameters ---------- map_dict Map to use for merging _annotations New annotations to passed to `essential` class :class:`~pmaf.biome.essentials._taxonomy.RepTaxonomy` kwargs Compatibility """ new_tips = [] for new_id, group in map_dict.items(): tmp_new_name = str(new_id) new_tips.append(tmp_new_name) if len(group) > 1: tmp_new_node = self.__internal_tree.merge_nodes(list(map(str, group))) else: tmp_new_node = self.__internal_tree.get_node_by_name(str(group[0])) tmp_new_node.name = tmp_new_name self.__feature_ids_dtype = type(list(map_dict.keys())[0]) self.__internal_tree.prune_by_ids(new_tips) self.__annotations = ( {str(tid): annot for tid, annot in _annotations.items()} if _annotations is not None else {} ) self.__feature_ids = np.array(new_tips, dtype=str) return self._ratify_action( "_merge_features_by_map", map_dict, _annotations=_annotations, **kwargs ) def __make_annotated_tree(self) -> PhyloTree: """Create an original tree with annotated tips. Returns ------- Annotated tree of class :class:`~pmaf.phylo.tree._tree.PhyloTree` """ tmp_annotated_tree = self.__internal_tree.copy() tmp_annotated_tree.annotate_nodes_by_map(self.__annotations, only_tips=True) tmp_annotated_tree.sort_by_name() return tmp_annotated_tree
[docs] def get_annotated_tree(self) -> PhyloTree: """Retrieves annotated tree. Returns ------- Annotated tree of class class:`~pmaf.phylo.tree.PhyloTree` """ if len(self.__annotations) > 0: return self.__make_annotated_tree() else: raise RuntimeError("Annotations are not present.")
[docs] def resolve_polytomy(self) -> None: """Resolve tree polytomy.""" self.__internal_tree.resolve_polytomy()
[docs] def render_art(self, output_fp: str, annotated: bool = False) -> Any: """Renders tree into file. Parameters ---------- output_fp File to render into. File format depends on the extension. For example, .pdf will produce PDF file and while .png will produce PNG file. annotated Whether to create tree with annotated tips or not. """ if annotated: if len(self.__annotations) > 0: return self.__make_annotated_tree().render(output_fp) else: raise RuntimeError("Annotations are not present.") else: return self.__internal_tree.render(output_fp)
[docs] def get_ascii_art(self, annotated: bool = False) -> str: """Creates ASCII art of the tree. Parameters ---------- annotated Whether to create tree with annotated tips or not Returns ------- String with ASCII art """ if annotated: if len(self.__annotations) > 0: return self.__make_annotated_tree().get_ascii_art() else: raise RuntimeError("Annotations are not present.") else: return self.__internal_tree.get_ascii_art()
def _export( self, rooted: bool = False, annotated: bool = False, **kwargs: Any ) -> Tuple[str, dict]: """Creates the Newick formatted tree for export. Parameters ---------- rooted Whether to create rooted tree or not annotated Whether to create tree with annotated tips or not kwargs Compatibility """ if annotated: if len(self.__annotations) > 0: return ( self.__make_annotated_tree().get_newick_str( root_node=rooted, quoted_nodes=True, **kwargs ), kwargs, ) else: raise RuntimeError("Annotations are not present.") else: return ( self.__internal_tree.get_newick_str( root_node=rooted, quoted_nodes=False, **kwargs ), kwargs, )
[docs] def export(self, output_fp: str, _add_ext: bool = False, **kwargs: Any) -> None: """Exports the Newick formatted phylogenetic tree into specified file. Parameters ---------- output_fp Export filepath _add_ext Add file extension or not kwargs Compatibility """ tmp_export, _ = self._export(**kwargs) if _add_ext: tmp_filepath = "{}.tre".format(output_fp) else: tmp_filepath = output_fp with open(tmp_filepath, "w") as output: output.write(tmp_export)
[docs] def copy(self) -> "RepPhylogeny": """Copy of the instance.""" return type(self)( tree=self.__internal_tree, feature_ids=self.__feature_ids, annotation=self.__annotations, copy=True, metadata=self.metadata, name=self.name, )
[docs] def get_subset( self, rids: Optional[AnyGenericIdentifier] = None, *args, **kwargs: Any ) -> "RepPhylogeny": """Get subset of the :class:`.RepPhylogeny`. Parameters ---------- rids Feature Identifiers args Compatibility kwargs Compatibility Returns ------- Instance of class:`.RepPhylogeny` """ if rids is None: target_rids = self.__feature_ids else: target_rids = np.asarray(rids).astype(str) if not np.isin(self.__feature_ids, target_rids).sum() == len(target_rids): raise ValueError("Invalid feature ids are provided.") tmp_tree = self.__internal_tree.copy() tmp_tree.prune_by_ids(target_rids) return type(self)( tree=tmp_tree, feature_ids=self.__feature_ids, annotation=self.__annotations, copy=True, metadata=self.metadata, name=self.name, )
[docs] def write(self, output_fp: str, mode: str = "w", **kwargs: Any) -> None: """Writes the Newick tree into specified file. Parameters ---------- output_fp Output filepath mode File write mode. kwargs Compatibility """ tmp_export, _ = self._export(**kwargs) with open(output_fp, mode) as output: output.write(tmp_export)
@property def data(self) -> PhyloTree: """Phylogenetic Tree.""" return self.__internal_tree @property def annotations(self) -> dict: """Tip Annotations.""" return self.__annotations @property def xrid(self) -> AnyGenericIdentifier: """Feature identifiers.""" return pd.Index(self.__feature_ids).astype(self.__feature_ids_dtype)